1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

chore(arrow) bump apache arrow version and fix test load generator for memory usage (#3149)

* bump apache arrow version and fix test load generator for memory usage
* limit arrow simd by SSE4.2
This commit is contained in:
Leonid Fedorov
2024-03-22 19:47:55 +03:00
committed by GitHub
parent 49757ba8d5
commit 9fe6efe84a
2 changed files with 362 additions and 350 deletions

View File

@ -31,14 +31,15 @@ set(ARROW_CMAKE_ARGS "-DCMAKE_INSTALL_PREFIX=${ARROW_PREFIX}"
"-DARROW_DATASET=ON"
"-DARROW_PARQUET=ON"
"-DARROW_FILESYSTEM=ON"
"-DARROW_RUNTIME_SIMD_LEVEL=SSE4_2"
"-DThrift_ROOT=${CMAKE_CURRENT_BINARY_DIR}/external/thrift"
)
set(ARROW_INCLUDE_DIR "${ARROW_PREFIX}/include")
set(ARROW_BUILD_BYPRODUCTS "${ARROW_STATIC_LIB}" "${PARQUET_STATIC_LIB}")
externalproject_add(external_arrow
URL https://github.com/apache/arrow/archive/refs/tags/go/v13.0.0.tar.gz
URL_HASH SHA256=ea4a79a4103379573ecbcf19229437a4ba547c0146a7f3c3be0a7e0b3de5de6c
URL https://github.com/apache/arrow/archive/refs/tags/apache-arrow-15.0.2.tar.gz
URL_HASH SHA256=4735b349845bff1fe95ed11abbfed204eb092cabc37523aa13a80cb830fe5b5e
SOURCE_SUBDIR cpp
BINARY_DIR "${ARROW_BINARY_DIR}"
CMAKE_ARGS "${ARROW_CMAKE_ARGS}"

View File

@ -5,8 +5,14 @@
#include <arrow/io/api.h>
#include <parquet/exception.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/schema.h>
#include <arrow/type.h>
#include <parquet/arrow/writer.h>
#include <iostream>
#include <array>
#include <type_traits>
static void usage(const std::string& pname)
{
std::cout << "usage: " << pname << " [-dalbscih]" << std::endl;
@ -49,8 +55,7 @@ void generateIntTable(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/int32.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int32.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -76,15 +81,13 @@ void generateInt64Table(std::string fileDir)
std::shared_ptr<arrow::Array> array;
PARQUET_THROW_NOT_OK(builder.Finish(&array));
std::shared_ptr<arrow::Schema> schema = arrow::schema({arrow::field("col1", arrow::int64())});
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {array});
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/int64.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int64.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -112,8 +115,7 @@ void generateFloatTable(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/float.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/float.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -142,8 +144,7 @@ void generateDoubleTable(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/double.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/double.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -165,14 +166,14 @@ void generateTimeTable(std::string fileDir)
std::shared_ptr<arrow::Array> time32array;
PARQUET_THROW_NOT_OK(time32builder.Finish(&time32array));
std::shared_ptr<arrow::Schema> schema = arrow::schema({arrow::field("col1", arrow::time32(arrow::TimeUnit::MILLI))});
std::shared_ptr<arrow::Schema> schema =
arrow::schema({arrow::field("col1", arrow::time32(arrow::TimeUnit::MILLI))});
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {time32array});
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/time.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/time.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -194,14 +195,14 @@ void generateTime64Table(std::string fileDir)
std::shared_ptr<arrow::Array> time64array;
PARQUET_THROW_NOT_OK(time64builder.Finish(&time64array));
std::shared_ptr<arrow::Schema> schema = arrow::schema({arrow::field("col1", arrow::time64(arrow::TimeUnit::MICRO))});
std::shared_ptr<arrow::Schema> schema =
arrow::schema({arrow::field("col1", arrow::time64(arrow::TimeUnit::MICRO))});
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {time64array});
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/time64.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/time64.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -268,8 +269,7 @@ void generateStringTable(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/string.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/string.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -291,14 +291,14 @@ void generateTimestampTable(std::string fileDir)
std::shared_ptr<arrow::Array> tsarray;
PARQUET_THROW_NOT_OK(tsbuilder.Finish(&tsarray));
std::shared_ptr<arrow::Schema> schema = arrow::schema({arrow::field("col1", arrow::timestamp(arrow::TimeUnit::MILLI))});
std::shared_ptr<arrow::Schema> schema =
arrow::schema({arrow::field("col1", arrow::timestamp(arrow::TimeUnit::MILLI))});
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {tsarray});
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/ts.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/ts.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -320,14 +320,14 @@ void generateTimestampUsTable(std::string fileDir)
std::shared_ptr<arrow::Array> tsarray;
PARQUET_THROW_NOT_OK(tsbuilder.Finish(&tsarray));
std::shared_ptr<arrow::Schema> schema = arrow::schema({arrow::field("col1", arrow::timestamp(arrow::TimeUnit::MICRO))});
std::shared_ptr<arrow::Schema> schema =
arrow::schema({arrow::field("col1", arrow::timestamp(arrow::TimeUnit::MICRO))});
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {tsarray});
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/ts.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/ts.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -355,8 +355,7 @@ void generateDateTable(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/date.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/date.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -384,8 +383,7 @@ void generateInt16Table(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/int16.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int16.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -407,15 +405,13 @@ void generateInt8Table(std::string fileDir)
std::shared_ptr<arrow::Array> i8array;
PARQUET_THROW_NOT_OK(i8builder.Finish(&i8array));
std::shared_ptr<arrow::Schema> schema = arrow::schema({arrow::field("col1", arrow::int8())});
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {i8array});
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/int8.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int8.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -443,8 +439,7 @@ void generateDecimalTable(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/decimal.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/decimal.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -470,15 +465,13 @@ void generateUintTable(std::string fileDir)
std::shared_ptr<arrow::Array> array;
PARQUET_THROW_NOT_OK(builder.Finish(&array));
std::shared_ptr<arrow::Schema> schema = arrow::schema({arrow::field("col1", arrow::uint32())});
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {array});
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint32.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint32.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -506,8 +499,7 @@ void generateUint16Table(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint16.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint16.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -535,8 +527,7 @@ void generateUint8Table(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint8.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint8.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -568,8 +559,7 @@ void generateUint64Table(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint64.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint64.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -598,8 +588,7 @@ void generateBoolTable(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/bool.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/bool.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -623,8 +612,7 @@ void generateNullTable(std::string fileDir)
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/null.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/null.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3));
PARQUET_THROW_NOT_OK(outfile->Close());
}
@ -1006,8 +994,8 @@ void generateAllTable(std::string fileDir)
// make schema
// 28 cols
std::shared_ptr<arrow::Schema> schema = arrow::schema({
arrow::field("col1", arrow::int32()),
std::shared_ptr<arrow::Schema> schema =
arrow::schema({arrow::field("col1", arrow::int32()),
arrow::field("col2", arrow::int64()),
arrow::field("col3", arrow::float32()),
arrow::field("col4", arrow::float64()),
@ -1034,44 +1022,18 @@ void generateAllTable(std::string fileDir)
arrow::field("col25", arrow::timestamp(arrow::TimeUnit::MICRO)),
arrow::field("col26", arrow::timestamp(arrow::TimeUnit::MICRO)),
arrow::field("col27", arrow::binary()),
arrow::field("col28", arrow::fixed_size_binary(4))
});
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {
int32Array,
int64Array,
floatArray,
doubleArray,
time32array,
strarray,
strarray,
strarray,
strarray,
strarray,
strarray,
tsarray,
date32array,
tsarray,
i16array,
i8array,
decimalArray,
uint32Array,
ui16array,
ui8array,
uint64Array,
boolArray,
decimalArray1,
time64array,
tsarray1,
tsarray1,
binaryArray,
fixedSizeArray
});
arrow::field("col28", arrow::fixed_size_binary(4))});
std::shared_ptr<arrow::Table> table = arrow::Table::Make(
schema,
{int32Array, int64Array, floatArray, doubleArray, time32array, strarray, strarray,
strarray, strarray, strarray, strarray, tsarray, date32array, tsarray,
i16array, i8array, decimalArray, uint32Array, ui16array, ui8array, uint64Array,
boolArray, decimalArray1, time64array, tsarray1, tsarray1, binaryArray, fixedSizeArray});
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/tests.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/tests.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 1000));
PARQUET_THROW_NOT_OK(outfile->Close());
@ -1082,8 +1044,8 @@ void generateAllTable(std::string fileDir)
std::shared_ptr<arrow::Array> nullarray;
PARQUET_THROW_NOT_OK(nullBuilder.Finish(&nullarray));
std::shared_ptr<arrow::Schema> schema1 = arrow::schema({
arrow::field("col1", arrow::null()),
std::shared_ptr<arrow::Schema> schema1 =
arrow::schema({arrow::field("col1", arrow::null()),
arrow::field("col2", arrow::null()),
arrow::field("col3", arrow::null()),
arrow::field("col4", arrow::null()),
@ -1110,123 +1072,172 @@ void generateAllTable(std::string fileDir)
arrow::field("col25", arrow::timestamp(arrow::TimeUnit::MICRO)),
arrow::field("col26", arrow::timestamp(arrow::TimeUnit::MILLI)),
arrow::field("col27", arrow::null()),
arrow::field("col28", arrow::null())
});
std::shared_ptr<arrow::Table> table1 = arrow::Table::Make(schema1, {
nullarray,
nullarray,
nullarray,
nullarray,
nullarray,
nullarray,
nullarray,
strarray,
nullarray,
nullarray,
strarray,
tsarray,
nullarray,
nullarray,
nullarray,
nullarray,
nullarray,
nullarray,
nullarray,
nullarray,
nullarray,
nullarray,
nullarray,
nullarray,
tsarray1,
tsarray,
nullarray,
nullarray
});
arrow::field("col28", arrow::null())});
std::shared_ptr<arrow::Table> table1 = arrow::Table::Make(
schema1, {nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray,
strarray, nullarray, nullarray, strarray, tsarray, nullarray, nullarray,
nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray,
nullarray, nullarray, nullarray, tsarray1, tsarray, nullarray, nullarray});
std::shared_ptr<arrow::io::FileOutputStream> outfile1;
PARQUET_ASSIGN_OR_THROW(
outfile1, arrow::io::FileOutputStream::Open(fileDir + "/nulls.parquet"));
PARQUET_ASSIGN_OR_THROW(outfile1, arrow::io::FileOutputStream::Open(fileDir + "/nulls.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table1, pool, outfile1, 3));
PARQUET_THROW_NOT_OK(outfile1->Close());
}
/**
* generate large volume parquet files
*/
void generateLargeTable(int64_t reserve_num, std::string rowNum, std::string fileDir)
{
// int32
arrow::Int32Builder builder;
// int64_t reserve_num = 1000000;
PARQUET_THROW_NOT_OK(builder.Reserve(reserve_num));
std::vector<bool> validity(reserve_num, true);
std::vector<int32_t> values;
for (int32_t i = 0; i < reserve_num; i++)
values.push_back(i);
PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity));
std::shared_ptr<arrow::Array> array;
PARQUET_THROW_NOT_OK(builder.Finish(&array));
// timestamp
arrow::TimestampBuilder tsbuilder(arrow::timestamp(arrow::TimeUnit::MILLI), arrow::default_memory_pool());
PARQUET_THROW_NOT_OK(tsbuilder.Reserve(reserve_num));
std::vector<bool> tsvalidity(reserve_num, true);
std::vector<int64_t> tsvalues;
for (int64_t i = 0; i < reserve_num; i++)
tsvalues.push_back(i * 10000001);
PARQUET_THROW_NOT_OK(tsbuilder.AppendValues(tsvalues, tsvalidity));
std::shared_ptr<arrow::Array> tsarray;
PARQUET_THROW_NOT_OK(tsbuilder.Finish(&tsarray));
// string
arrow::StringBuilder strbuilder;
PARQUET_THROW_NOT_OK(strbuilder.Reserve(reserve_num));
std::vector<std::string> values1;
for (int64_t i = reserve_num-1; i >= 0; i--)
class LargeDataProducer : public arrow::RecordBatchReader
{
values1.push_back(std::string("hhhh"));
public:
LargeDataProducer(size_t numberOfRowsToProduce, size_t chunkSize)
: numberOfRowsToProduce_(numberOfRowsToProduce), chunkSize_(chunkSize)
{
PARQUET_THROW_NOT_OK(i32builder_.Reserve(chunkSize));
PARQUET_THROW_NOT_OK(tsbuilder_.Reserve(chunkSize));
PARQUET_THROW_NOT_OK(d128builder_.Reserve(chunkSize));
PARQUET_THROW_NOT_OK(strbuilder_.Reserve(chunkSize));
PARQUET_THROW_NOT_OK(doublebuilder_.Reserve(chunkSize));
}
PARQUET_THROW_NOT_OK(strbuilder.AppendValues(values1));
std::shared_ptr<arrow::Array> strarray;
PARQUET_THROW_NOT_OK(strbuilder.Finish(&strarray));
// decimal
auto t = arrow::Decimal128Type::Make(38, 10);
PARQUET_ASSIGN_OR_THROW(auto t1, t);
arrow::Decimal128Builder d128builder(t1, arrow::default_memory_pool());
for (int64_t i = 0; i < reserve_num; i++)
PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("1234567890987654321.12345678")));
std::shared_ptr<arrow::Array> decimalArray;
PARQUET_THROW_NOT_OK(d128builder.Finish(&decimalArray));
virtual std::shared_ptr<arrow::Schema> schema() const
{
return arrow::schema(
{arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::timestamp(arrow::TimeUnit::MILLI)),
arrow::field("col3", arrow::utf8()), arrow::field("col4", arrow::decimal128(38, 10)),
arrow::field("col5", arrow::float64()), arrow::field("col6", arrow::utf8())});
}
// double
arrow::DoubleBuilder doublebuilder;
PARQUET_THROW_NOT_OK(doublebuilder.Reserve(reserve_num));
std::vector<bool> dvalidity(reserve_num, true);
virtual arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch)
{
if (restChunkSize() == 0)
{
batch = nullptr;
}
else
{
*batch = arrow::RecordBatch::Make(schema(), restChunkSize(),
{ProduceInts32(), ProduceTimeStamps(), ProduceStrings(),
ProduceDecimals(), ProduceDoubles(), ProduceStrings()});
}
rowsProduced_ += restChunkSize();
return arrow::Status::OK();
}
private:
size_t restChunkSize()
{
return std::min(chunkSize_, numberOfRowsToProduce_ - rowsProduced_);
}
std::shared_ptr<arrow::Array> ProduceInts32()
{
size_t num = restChunkSize();
std::vector<bool> validity(num, true);
std::vector<int32_t> values;
values.reserve(num);
for (int32_t i = (int32_t)rowsProduced_; i < (int32_t)(rowsProduced_ + num); i++)
values.push_back(i);
PARQUET_THROW_NOT_OK(i32builder_.AppendValues(values, validity));
std::shared_ptr<arrow::Array> array;
PARQUET_THROW_NOT_OK(i32builder_.Finish(&array));
i32builder_.Reset();
return array;
}
std::shared_ptr<arrow::Array> ProduceDoubles()
{
size_t num = restChunkSize();
std::vector<bool> dvalidity(num, true);
std::vector<double> dvalues;
for (int i = 0; i < reserve_num; i++)
for (size_t i = rowsProduced_; i < rowsProduced_ + num; i++)
dvalues.push_back(i + 2.5);
PARQUET_THROW_NOT_OK(doublebuilder.AppendValues(dvalues, dvalidity));
PARQUET_THROW_NOT_OK(doublebuilder_.AppendValues(dvalues, dvalidity));
std::shared_ptr<arrow::Array> doublearray;
PARQUET_THROW_NOT_OK(doublebuilder.Finish(&doublearray));
PARQUET_THROW_NOT_OK(doublebuilder_.Finish(&doublearray));
doublebuilder_.Reset();
return doublearray;
}
std::shared_ptr<arrow::Schema> schema = arrow::schema({
arrow::field("col1", arrow::int32()),
arrow::field("col2", arrow::timestamp(arrow::TimeUnit::MILLI)),
arrow::field("col3", arrow::utf8()),
arrow::field("col4", arrow::decimal128(38, 10)),
arrow::field("col5", arrow::float64()),
arrow::field("col6", arrow::utf8())
});
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {array, tsarray, strarray, decimalArray, doublearray, strarray});
std::shared_ptr<arrow::Array> ProduceStrings()
{
size_t num = restChunkSize();
std::vector<std::string> values;
for (size_t i = 0; i < num ; i++)
{
values.push_back(std::string("hhhh"));
}
PARQUET_THROW_NOT_OK(strbuilder_.AppendValues(values));
std::shared_ptr<arrow::Array> strarray;
PARQUET_THROW_NOT_OK(strbuilder_.Finish(&strarray));
strbuilder_.Reset();
return strarray;
}
// write to file
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::Array> ProduceTimeStamps()
{
size_t num = restChunkSize();
std::vector<bool> tsvalidity(num, true);
std::vector<int64_t> tsvalues;
for (int64_t i = 0; i < int64_t(num); i++)
tsvalues.push_back(i * 10000001);
PARQUET_THROW_NOT_OK(tsbuilder_.AppendValues(tsvalues, tsvalidity));
std::shared_ptr<arrow::Array> tsarray;
PARQUET_THROW_NOT_OK(tsbuilder_.Finish(&tsarray));
tsbuilder_.Reset();
return tsarray;
}
std::shared_ptr<arrow::Array> ProduceDecimals()
{
size_t num = restChunkSize();
for (size_t i = 0; i < num; i++)
PARQUET_THROW_NOT_OK(d128builder_.Append(arrow::Decimal128("1234567890987654321.12345678")));
std::shared_ptr<arrow::Array> decimalArray;
PARQUET_THROW_NOT_OK(d128builder_.Finish(&decimalArray));
d128builder_.Reset();
return decimalArray;
}
private:
arrow::Int32Builder i32builder_;
arrow::DoubleBuilder doublebuilder_;
arrow::StringBuilder strbuilder_;
arrow::TimestampBuilder tsbuilder_{arrow::timestamp(arrow::TimeUnit::MILLI), arrow::default_memory_pool()};
arrow::Decimal128Builder d128builder_{arrow::Decimal128Type::Make(38, 10).ValueOrDie()};
size_t numberOfRowsToProduce_ = 100;
size_t chunkSize_ = 10;
size_t rowsProduced_ = 0;
};
void generateLargeTable(size_t reserve_num, std::string rowNum, std::string fileDir)
{
// Data is in RBR
std::shared_ptr<arrow::RecordBatchReader> batch_stream = std::make_shared<LargeDataProducer>(reserve_num, 1000000);
// Choose compression
std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().build();
// Opt to store Arrow schema for easier reads back into Arrow
std::shared_ptr<parquet::ArrowWriterProperties> arrow_props = parquet::ArrowWriterProperties::Builder().build();
// Create a writer
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/" + rowNum + "MRows.parquet"));
std::unique_ptr<parquet::arrow::FileWriter> writer;
PARQUET_ASSIGN_OR_THROW(
outfile, arrow::io::FileOutputStream::Open(fileDir + "/" + rowNum + "MRows.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 100000));
PARQUET_THROW_NOT_OK(outfile->Close());
writer, parquet::arrow::FileWriter::Open(*batch_stream->schema().get(), arrow::default_memory_pool(),
outfile, props, arrow_props));
// Write each batch as a row_group
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch : *batch_stream)
{
PARQUET_ASSIGN_OR_THROW(auto batch, maybe_batch);
PARQUET_ASSIGN_OR_THROW(auto table, arrow::Table::FromRecordBatches(batch->schema(), {batch}));
PARQUET_THROW_NOT_OK(writer->WriteTable(*table.get(), batch->num_rows()));
}
// Write file footer and close
PARQUET_THROW_NOT_OK(writer->Close());
}
int main(int argc, char** argv)