#include #include #include #include #include #include #include #include #include #include #include #include #include static void usage(const std::string& pname) { std::cout << "usage: " << pname << " [-dalbscih]" << std::endl; std::cout << "generate parquet files" << std::endl; std::cout << " -d generate different parquet files with one data type." << std::endl; std::cout << " -a generate one parquet file with different data types." << std::endl; std::cout << " -l generate large volume parquet files." << std::endl; std::cout << " -b generate one parquet file with NULL data type." << std::endl; std::cout << " -s generate one parquet file with STRING data type." << std::endl; std::cout << " -c generate one parquet file with DECIMAL data type." << std::endl; std::cout << " -i generate one parquet file with INT data type." << std::endl; std::cout << " -h display this help text" << std::endl; } /** * generate one parquet file with INT32 data type */ void generateIntTable(std::string fileDir) { // generate data arrow::Int32Builder builder; int reserve_num = 30; PARQUET_THROW_NOT_OK(builder.Reserve(reserve_num)); std::vector validity(reserve_num, true); validity[1] = 0; validity[2] = 0; validity[3] = 0; std::vector values; for (int32_t i = 0; i < reserve_num - 1; i++) values.push_back(i); values.push_back(static_cast(2147483648)); PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity)); std::shared_ptr array; PARQUET_THROW_NOT_OK(builder.Finish(&array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int32())}); std::shared_ptr table = arrow::Table::Make(schema, {array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int32.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with INT64 data type */ void generateInt64Table(std::string fileDir) { // generate data arrow::Int64Builder builder; int reserve_num = 30; PARQUET_THROW_NOT_OK(builder.Reserve(reserve_num)); std::vector validity(reserve_num, true); validity[1] = 0; validity[2] = 0; validity[3] = 0; std::vector values; for (int64_t i = 0; i < reserve_num - 1; i++) values.push_back(i); values.push_back(2147483648); PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity)); std::shared_ptr array; PARQUET_THROW_NOT_OK(builder.Finish(&array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int64())}); std::shared_ptr table = arrow::Table::Make(schema, {array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int64.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with FLOAT data type */ void generateFloatTable(std::string fileDir) { int reserve_num = 30; arrow::FloatBuilder builder; PARQUET_THROW_NOT_OK(builder.Reserve(reserve_num)); std::vector validity(reserve_num, true); validity[2] = 0; std::vector values; for (int i = 0; i < reserve_num; i++) values.push_back(i + 1.5); PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity)); std::shared_ptr array; PARQUET_THROW_NOT_OK(builder.Finish(&array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::float32())}); std::shared_ptr table = arrow::Table::Make(schema, {array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/float.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with DOUBLE data type */ void generateDoubleTable(std::string fileDir) { // -----------------Float64----------------------- int reserve_num = 30; arrow::DoubleBuilder doublebuilder; PARQUET_THROW_NOT_OK(doublebuilder.Reserve(reserve_num)); std::vector dvalidity(reserve_num, true); dvalidity[3] = 0; std::vector dvalues; for (int i = 0; i < reserve_num; i++) dvalues.push_back(i + 2.5); PARQUET_THROW_NOT_OK(doublebuilder.AppendValues(dvalues, dvalidity)); std::shared_ptr doublearray; PARQUET_THROW_NOT_OK(doublebuilder.Finish(&doublearray)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::float64())}); std::shared_ptr table = arrow::Table::Make(schema, {doublearray}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/double.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with TIME data type */ void generateTimeTable(std::string fileDir) { int reserve_num = 30; arrow::Time32Builder time32builder(arrow::time32(arrow::TimeUnit::MILLI), arrow::default_memory_pool()); // int reserve_num = 500; PARQUET_THROW_NOT_OK(time32builder.Reserve(reserve_num)); std::vector time32validity(reserve_num, true); std::vector time32values; for (int32_t i = 0; i < reserve_num; i++) time32values.push_back(i * 3605000); PARQUET_THROW_NOT_OK(time32builder.AppendValues(time32values, time32validity)); std::shared_ptr time32array; PARQUET_THROW_NOT_OK(time32builder.Finish(&time32array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::time32(arrow::TimeUnit::MILLI))}); std::shared_ptr table = arrow::Table::Make(schema, {time32array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/time.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with TIME64(microsecond) data type */ void generateTime64Table(std::string fileDir) { int64_t reserve_num = 30; arrow::Time64Builder time64builder(arrow::time64(arrow::TimeUnit::MICRO), arrow::default_memory_pool()); // int reserve_num = 500; PARQUET_THROW_NOT_OK(time64builder.Reserve(reserve_num)); std::vector time64validity(reserve_num, true); std::vector time64values; for (int64_t i = 0; i < reserve_num; i++) time64values.push_back(i * 3605001); PARQUET_THROW_NOT_OK(time64builder.AppendValues(time64values, time64validity)); std::shared_ptr time64array; PARQUET_THROW_NOT_OK(time64builder.Finish(&time64array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::time64(arrow::TimeUnit::MICRO))}); std::shared_ptr table = arrow::Table::Make(schema, {time64array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/time64.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with STRING data type */ void generateStringTable(std::string fileDir) { const int reserve_num = 30; // ----------------- String ------------------------- arrow::StringBuilder strbuilder; PARQUET_THROW_NOT_OK(strbuilder.Reserve(reserve_num)); uint8_t validity1[reserve_num]; std::vector values1; for (int64_t i = reserve_num - 1; i >= 0; i--) { // values1.push_back(std::string("hhhh")); validity1[i] = 1; } // 30 different values values1.push_back(std::string("a")); values1.push_back(std::string("ab")); values1.push_back(std::string("abcd")); values1.push_back(std::string("abcde")); values1.push_back(std::string("abcdefg")); values1.push_back(std::string("Whlg1xXAxP")); values1.push_back(std::string("4NimzSQzMD")); values1.push_back(std::string("G23ne3j92Ky0wBF")); values1.push_back(std::string("F4z")); values1.push_back(std::string("8JCVTsGYB7V")); values1.push_back(std::string("23235")); values1.push_back(std::string("sda22")); values1.push_back(std::string("SD7sdFD7")); values1.push_back(std::string("gvv3hYwdfOD")); values1.push_back(std::string("y8wjo4v50s6")); values1.push_back(std::string("aNJW56SJieE8KVV")); values1.push_back(std::string("1+2=3")); values1.push_back(std::string("Hello World!")); values1.push_back(std::string("1!!!1")); values1.push_back(std::string("824407880313877")); values1.push_back(std::string("1970-01-01 08:02:23")); values1.push_back(std::string("1970-05-31")); values1.push_back(std::string("xxx")); values1.push_back(std::string("ONMKMQVBRWBUTWT")); values1.push_back(std::string("ZWMWHSEZDYODQWP")); values1.push_back(std::string("HoCYpJ")); values1.push_back(std::string("-100")); values1.push_back(std::string("Iqa8Nr")); values1.push_back(std::string("nD274v")); values1.push_back(std::string("6y0JyW")); validity1[1] = 0; // set element 1 null PARQUET_THROW_NOT_OK(strbuilder.AppendValues(values1, validity1)); std::shared_ptr strarray; PARQUET_THROW_NOT_OK(strbuilder.Finish(&strarray)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::utf8())}); std::shared_ptr table = arrow::Table::Make(schema, {strarray}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/string.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with TIMESTAMP data type(millisecond) */ void generateTimestampTable(std::string fileDir) { int reserve_num = 30; // ----------------- Timestamp ------------------------- arrow::TimestampBuilder tsbuilder(arrow::timestamp(arrow::TimeUnit::MILLI), arrow::default_memory_pool()); PARQUET_THROW_NOT_OK(tsbuilder.Reserve(reserve_num)); std::vector tsvalidity(reserve_num, true); std::vector tsvalues; for (int64_t i = 0; i < reserve_num; i++) tsvalues.push_back(i * 10000001); PARQUET_THROW_NOT_OK(tsbuilder.AppendValues(tsvalues, tsvalidity)); std::shared_ptr tsarray; PARQUET_THROW_NOT_OK(tsbuilder.Finish(&tsarray)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::timestamp(arrow::TimeUnit::MILLI))}); std::shared_ptr table = arrow::Table::Make(schema, {tsarray}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/ts.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with TIMESTAMP data type(microsecond) */ void generateTimestampUsTable(std::string fileDir) { int reserve_num = 30; // ----------------- Timestamp ------------------------- arrow::TimestampBuilder tsbuilder(arrow::timestamp(arrow::TimeUnit::MICRO), arrow::default_memory_pool()); PARQUET_THROW_NOT_OK(tsbuilder.Reserve(reserve_num)); std::vector tsvalidity(reserve_num, true); std::vector tsvalues; for (int64_t i = 0; i < reserve_num; i++) tsvalues.push_back(i * 10000000001); PARQUET_THROW_NOT_OK(tsbuilder.AppendValues(tsvalues, tsvalidity)); std::shared_ptr tsarray; PARQUET_THROW_NOT_OK(tsbuilder.Finish(&tsarray)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::timestamp(arrow::TimeUnit::MICRO))}); std::shared_ptr table = arrow::Table::Make(schema, {tsarray}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/ts.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with DATE data type */ void generateDateTable(std::string fileDir) { int reserve_num = 30; // -------------------------DATETIME arrow::Date32Builder date32builder; PARQUET_THROW_NOT_OK(date32builder.Reserve(reserve_num)); std::vector date32validity(reserve_num, true); std::vector date32values; for (int32_t i = 0; i < reserve_num; i++) date32values.push_back(i * 10); PARQUET_THROW_NOT_OK(date32builder.AppendValues(date32values, date32validity)); std::shared_ptr date32array; PARQUET_THROW_NOT_OK(date32builder.Finish(&date32array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::date32())}); std::shared_ptr table = arrow::Table::Make(schema, {date32array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/date.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with INT16 data type */ void generateInt16Table(std::string fileDir) { int reserve_num = 30; // ---------------int16 arrow::Int16Builder i16builder; PARQUET_THROW_NOT_OK(i16builder.Reserve(reserve_num)); std::vector i16validity(reserve_num, true); std::vector i16values; for (int16_t i = 0; i < reserve_num; i++) i16values.push_back(i); PARQUET_THROW_NOT_OK(i16builder.AppendValues(i16values, i16validity)); std::shared_ptr i16array; PARQUET_THROW_NOT_OK(i16builder.Finish(&i16array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int16())}); std::shared_ptr table = arrow::Table::Make(schema, {i16array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int16.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with INT8 data type */ void generateInt8Table(std::string fileDir) { int reserve_num = 30; // ---------------int16 arrow::Int8Builder i8builder; PARQUET_THROW_NOT_OK(i8builder.Reserve(reserve_num)); std::vector i8validity(reserve_num, true); std::vector i8values; for (int8_t i = 0; i < reserve_num; i++) i8values.push_back(i); PARQUET_THROW_NOT_OK(i8builder.AppendValues(i8values, i8validity)); std::shared_ptr i8array; PARQUET_THROW_NOT_OK(i8builder.Finish(&i8array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int8())}); std::shared_ptr table = arrow::Table::Make(schema, {i8array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/int8.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with DECIMAL data type */ void generateDecimalTable(std::string fileDir) { // ----------------------decimal auto t = arrow::Decimal128Type::Make(9, 3); PARQUET_ASSIGN_OR_THROW(auto t1, t); arrow::Decimal128Builder d128builder(t1, arrow::default_memory_pool()); // std::cout << arrow::Decimal128("138.3433").kBitWidth << std::endl; PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("138.3433"))); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("532235.234"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("5325.234"))); std::shared_ptr decimalArray; PARQUET_THROW_NOT_OK(d128builder.Finish(&decimalArray)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::decimal128(9, 3))}); std::shared_ptr table = arrow::Table::Make(schema, {decimalArray}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/decimal.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with UNSIGNED INT data type */ void generateUintTable(std::string fileDir) { // generate data arrow::UInt32Builder builder; uint reserve_num = 30; PARQUET_THROW_NOT_OK(builder.Reserve(reserve_num)); std::vector validity(reserve_num, true); validity[1] = 0; validity[2] = 0; validity[3] = 0; std::vector values; for (uint32_t i = 0; i < reserve_num - 1; i++) values.push_back(i); values.push_back(2147483648); PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity)); std::shared_ptr array; PARQUET_THROW_NOT_OK(builder.Finish(&array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint32())}); std::shared_ptr table = arrow::Table::Make(schema, {array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint32.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with UNSIGNED INT16 data type */ void generateUint16Table(std::string fileDir) { uint16_t reserve_num = 30; // ---------------int16 arrow::UInt16Builder i16builder; PARQUET_THROW_NOT_OK(i16builder.Reserve(reserve_num)); std::vector i16validity(reserve_num, true); std::vector i16values; for (uint16_t i = 0; i < reserve_num; i++) i16values.push_back(i); PARQUET_THROW_NOT_OK(i16builder.AppendValues(i16values, i16validity)); std::shared_ptr i16array; PARQUET_THROW_NOT_OK(i16builder.Finish(&i16array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint16())}); std::shared_ptr table = arrow::Table::Make(schema, {i16array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint16.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with UNSIGNED INT8 data type */ void generateUint8Table(std::string fileDir) { uint8_t reserve_num = 30; // ---------------int16 arrow::UInt8Builder i8builder; PARQUET_THROW_NOT_OK(i8builder.Reserve(reserve_num)); std::vector i8validity(reserve_num, true); std::vector i8values; for (uint8_t i = 0; i < reserve_num; i++) i8values.push_back(i); PARQUET_THROW_NOT_OK(i8builder.AppendValues(i8values, i8validity)); std::shared_ptr i8array; PARQUET_THROW_NOT_OK(i8builder.Finish(&i8array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint8())}); std::shared_ptr table = arrow::Table::Make(schema, {i8array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint8.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with UNSIGNED INT64 data type */ void generateUint64Table(std::string fileDir) { // generate data arrow::UInt64Builder builder; uint64_t reserve_num = 30; PARQUET_THROW_NOT_OK(builder.Reserve(reserve_num)); std::vector validity(reserve_num, true); validity[1] = 0; validity[2] = 0; validity[3] = 0; std::vector values; for (uint64_t i = 0; i < reserve_num - 1; i++) values.push_back(i); values.push_back(2147483648); PARQUET_THROW_NOT_OK(builder.AppendValues(values, validity)); std::shared_ptr array; PARQUET_THROW_NOT_OK(builder.Finish(&array)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::uint64())}); std::shared_ptr table = arrow::Table::Make(schema, {array}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/uint64.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with BOOLEAN data type */ void generateBoolTable(std::string fileDir) { int reserve_num = 30; // ----------------------boolean arrow::BooleanBuilder boolBuilder; PARQUET_THROW_NOT_OK(boolBuilder.Reserve(reserve_num)); std::vector boolValidity(reserve_num, true); std::vector boolValues; for (int i = 0; i < reserve_num; i++) boolValues.push_back(true); boolValues[5] = false; PARQUET_THROW_NOT_OK(boolBuilder.AppendValues(boolValues, boolValidity)); std::shared_ptr boolArray; PARQUET_THROW_NOT_OK(boolBuilder.Finish(&boolArray)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::boolean())}); std::shared_ptr table = arrow::Table::Make(schema, {boolArray}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/bool.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate one parquet file with NULL data type */ void generateNullTable(std::string fileDir) { int reserve_num = 30; // ---------------------null arrow::NullBuilder nullBuilder; PARQUET_THROW_NOT_OK(nullBuilder.Reserve(reserve_num)); PARQUET_THROW_NOT_OK(nullBuilder.AppendNulls(reserve_num)); std::shared_ptr nullarray; PARQUET_THROW_NOT_OK(nullBuilder.Finish(&nullarray)); std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::null())}); std::shared_ptr table = arrow::Table::Make(schema, {nullarray}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/null.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 3)); PARQUET_THROW_NOT_OK(outfile->Close()); } /** * generate different parquet files with one data type */ void generateTable(std::string fileDir) { generateBoolTable(fileDir); generateDateTable(fileDir); generateDecimalTable(fileDir); generateDoubleTable(fileDir); generateFloatTable(fileDir); generateInt16Table(fileDir); generateInt64Table(fileDir); generateInt8Table(fileDir); generateIntTable(fileDir); generateNullTable(fileDir); generateStringTable(fileDir); generateTimestampTable(fileDir); generateTimeTable(fileDir); generateUint16Table(fileDir); generateUint64Table(fileDir); generateUint8Table(fileDir); generateUintTable(fileDir); } /** * generate one parquet file with different data types */ void generateAllTable(std::string fileDir) { const int reserve_num = 30; // boolean arrow::BooleanBuilder boolBuilder; PARQUET_THROW_NOT_OK(boolBuilder.Reserve(reserve_num)); std::vector boolValidity(reserve_num, true); std::vector boolValues; for (int i = 0; i < reserve_num; i++) boolValues.push_back(true); boolValues[5] = false; PARQUET_THROW_NOT_OK(boolBuilder.AppendValues(boolValues, boolValidity)); std::shared_ptr boolArray; PARQUET_THROW_NOT_OK(boolBuilder.Finish(&boolArray)); // int32 arrow::Int32Builder int32Builder; PARQUET_THROW_NOT_OK(int32Builder.Reserve(reserve_num)); std::vector int32Validity(reserve_num, true); int32Validity[1] = 0; int32Validity[2] = 0; int32Validity[3] = 0; std::vector int32Values; for (int32_t i = 0; i < reserve_num - 1; i++) int32Values.push_back(i); int32Values.push_back(static_cast(2147483648)); PARQUET_THROW_NOT_OK(int32Builder.AppendValues(int32Values, int32Validity)); std::shared_ptr int32Array; PARQUET_THROW_NOT_OK(int32Builder.Finish(&int32Array)); // int64 arrow::Int64Builder int64Builder; PARQUET_THROW_NOT_OK(int64Builder.Reserve(reserve_num)); std::vector int64Validity(reserve_num, true); int64Validity[1] = 0; int64Validity[2] = 0; int64Validity[3] = 0; std::vector int64Values; for (int64_t i = 0; i < reserve_num - 1; i++) int64Values.push_back(i); int64Values.push_back(2147483648); PARQUET_THROW_NOT_OK(int64Builder.AppendValues(int64Values, int64Validity)); std::shared_ptr int64Array; PARQUET_THROW_NOT_OK(int64Builder.Finish(&int64Array)); // float arrow::FloatBuilder floatBuilder; PARQUET_THROW_NOT_OK(floatBuilder.Reserve(reserve_num)); std::vector floatValidity(reserve_num, true); floatValidity[2] = 0; std::vector floatValues; for (int i = 0; i < reserve_num; i++) floatValues.push_back(i + 1.5); PARQUET_THROW_NOT_OK(floatBuilder.AppendValues(floatValues, floatValidity)); std::shared_ptr floatArray; PARQUET_THROW_NOT_OK(floatBuilder.Finish(&floatArray)); // float64 arrow::DoubleBuilder doubleBuilder; PARQUET_THROW_NOT_OK(doubleBuilder.Reserve(reserve_num)); std::vector dvalidity(reserve_num, true); dvalidity[3] = 0; std::vector dvalues; for (int i = 0; i < reserve_num; i++) dvalues.push_back(i + 2.5); PARQUET_THROW_NOT_OK(doubleBuilder.AppendValues(dvalues, dvalidity)); std::shared_ptr doubleArray; PARQUET_THROW_NOT_OK(doubleBuilder.Finish(&doubleArray)); // time32 arrow::Time32Builder time32builder(arrow::time32(arrow::TimeUnit::MILLI), arrow::default_memory_pool()); PARQUET_THROW_NOT_OK(time32builder.Reserve(reserve_num)); std::vector time32validity(reserve_num, true); std::vector time32values; for (int32_t i = 0; i < reserve_num; i++) time32values.push_back(i * 3605001); PARQUET_THROW_NOT_OK(time32builder.AppendValues(time32values, time32validity)); std::shared_ptr time32array; PARQUET_THROW_NOT_OK(time32builder.Finish(&time32array)); // time64(microsecond) int64_t reserve_num64 = 30; arrow::Time64Builder time64builder(arrow::time64(arrow::TimeUnit::MICRO), arrow::default_memory_pool()); PARQUET_THROW_NOT_OK(time64builder.Reserve(reserve_num64)); std::vector time64validity(reserve_num, true); std::vector time64values; for (int64_t i = 0; i < reserve_num64; i++) time64values.push_back(i * 3605000001); PARQUET_THROW_NOT_OK(time64builder.AppendValues(time64values, time64validity)); std::shared_ptr time64array; PARQUET_THROW_NOT_OK(time64builder.Finish(&time64array)); // string arrow::StringBuilder strbuilder; PARQUET_THROW_NOT_OK(strbuilder.Reserve(reserve_num)); uint8_t validity1[reserve_num]; std::vector values1; for (int64_t i = reserve_num - 1; i >= 0; i--) { validity1[i] = 1; } // 30 different values values1.push_back(std::string("a")); values1.push_back(std::string("ab")); values1.push_back(std::string("abcd")); values1.push_back(std::string("abcde")); values1.push_back(std::string("abcdefg")); values1.push_back(std::string("Whlg1xXAxP")); values1.push_back(std::string("4NimzSQzMD")); values1.push_back(std::string("G23ne3j92Ky0wBF")); values1.push_back(std::string("F4z")); values1.push_back(std::string("8JCVTsGYB7V")); values1.push_back(std::string("23235")); values1.push_back(std::string("sda22")); values1.push_back(std::string("SD7sdFD7")); values1.push_back(std::string("gvv3hYwdfOD")); values1.push_back(std::string("y8wjo4v50s6")); values1.push_back(std::string("aNJW56SJieE8KVV")); values1.push_back(std::string("1+2=3")); values1.push_back(std::string("Hello World!")); values1.push_back(std::string("1!!!1")); values1.push_back(std::string("824407880313877")); values1.push_back(std::string("1970-01-01 08:02:23")); values1.push_back(std::string("1970-05-31")); values1.push_back(std::string("xxx")); values1.push_back(std::string("ONMKMQVBRWBUTWT")); values1.push_back(std::string("ZWMWHSEZDYODQWP")); values1.push_back(std::string("HoCYpJ")); values1.push_back(std::string("-100")); values1.push_back(std::string("Iqa8Nr")); values1.push_back(std::string("nD274v")); values1.push_back(std::string("6y0JyW")); validity1[1] = 0; // set element 1 null PARQUET_THROW_NOT_OK(strbuilder.AppendValues(values1, validity1)); std::shared_ptr strarray; PARQUET_THROW_NOT_OK(strbuilder.Finish(&strarray)); // timestamp(millisecond) arrow::TimestampBuilder tsbuilder(arrow::timestamp(arrow::TimeUnit::MILLI), arrow::default_memory_pool()); PARQUET_THROW_NOT_OK(tsbuilder.Reserve(reserve_num)); std::vector tsvalidity(reserve_num, true); std::vector tsvalues; for (int64_t i = 0; i < reserve_num; i++) tsvalues.push_back(i * 10000001); PARQUET_THROW_NOT_OK(tsbuilder.AppendValues(tsvalues, tsvalidity)); std::shared_ptr tsarray; PARQUET_THROW_NOT_OK(tsbuilder.Finish(&tsarray)); // timestamp(microsecond) arrow::TimestampBuilder tsbuilder1(arrow::timestamp(arrow::TimeUnit::MICRO), arrow::default_memory_pool()); PARQUET_THROW_NOT_OK(tsbuilder1.Reserve(reserve_num)); std::vector tsvalidity1(reserve_num, true); std::vector tsvalues1; for (int64_t i = 0; i < reserve_num; i++) tsvalues1.push_back(i * 10000000001); PARQUET_THROW_NOT_OK(tsbuilder1.AppendValues(tsvalues1, tsvalidity1)); std::shared_ptr tsarray1; PARQUET_THROW_NOT_OK(tsbuilder1.Finish(&tsarray1)); // datetime arrow::Date32Builder date32builder; PARQUET_THROW_NOT_OK(date32builder.Reserve(reserve_num)); std::vector date32validity(reserve_num, true); std::vector date32values; for (int32_t i = 0; i < reserve_num; i++) date32values.push_back(i * 10); PARQUET_THROW_NOT_OK(date32builder.AppendValues(date32values, date32validity)); std::shared_ptr date32array; PARQUET_THROW_NOT_OK(date32builder.Finish(&date32array)); // int16 arrow::Int16Builder i16builder; PARQUET_THROW_NOT_OK(i16builder.Reserve(reserve_num)); std::vector i16validity(reserve_num, true); std::vector i16values; for (int16_t i = 0; i < reserve_num; i++) i16values.push_back(i); PARQUET_THROW_NOT_OK(i16builder.AppendValues(i16values, i16validity)); std::shared_ptr i16array; PARQUET_THROW_NOT_OK(i16builder.Finish(&i16array)); // int8 arrow::Int8Builder i8builder; PARQUET_THROW_NOT_OK(i8builder.Reserve(reserve_num)); std::vector i8validity(reserve_num, true); std::vector i8values; for (int8_t i = 0; i < reserve_num; i++) i8values.push_back(i); PARQUET_THROW_NOT_OK(i8builder.AppendValues(i8values, i8validity)); std::shared_ptr i8array; PARQUET_THROW_NOT_OK(i8builder.Finish(&i8array)); // decimal auto t = arrow::Decimal128Type::Make(9, 3); PARQUET_ASSIGN_OR_THROW(auto t1, t); arrow::Decimal128Builder d128builder(t1, arrow::default_memory_pool()); // std::cout << arrow::Decimal128("138.3433").kBitWidth << std::endl; PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("138.3433"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("532235.234"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("5325.234"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("138.3433"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("532235.234"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("5325.234"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("138.3433"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("532235.234"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("5325.234"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("138.3433"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("532235.234"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("5325.234"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("138.3433"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("532235.234"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); PARQUET_THROW_NOT_OK(d128builder.Append(arrow::Decimal128("5325.234"))); PARQUET_THROW_NOT_OK(d128builder.AppendNull()); std::shared_ptr decimalArray; PARQUET_THROW_NOT_OK(d128builder.Finish(&decimalArray)); // uint32 arrow::UInt32Builder uint32Builder; PARQUET_THROW_NOT_OK(uint32Builder.Reserve(reserve_num)); std::vector uint32Validity(reserve_num, true); uint32Validity[1] = 0; uint32Validity[2] = 0; uint32Validity[3] = 0; std::vector uint32Values; for (uint32_t i = 0; i < reserve_num - 1; i++) uint32Values.push_back(i); uint32Values.push_back(2147483648); PARQUET_THROW_NOT_OK(uint32Builder.AppendValues(uint32Values, uint32Validity)); std::shared_ptr uint32Array; PARQUET_THROW_NOT_OK(uint32Builder.Finish(&uint32Array)); // uint16 arrow::UInt16Builder ui16builder; PARQUET_THROW_NOT_OK(ui16builder.Reserve(reserve_num)); std::vector ui16validity(reserve_num, true); std::vector ui16values; for (uint16_t i = 0; i < reserve_num; i++) ui16values.push_back(i); PARQUET_THROW_NOT_OK(ui16builder.AppendValues(ui16values, ui16validity)); std::shared_ptr ui16array; PARQUET_THROW_NOT_OK(ui16builder.Finish(&ui16array)); // uint8 arrow::UInt8Builder ui8builder; PARQUET_THROW_NOT_OK(ui8builder.Reserve(reserve_num)); std::vector ui8validity(reserve_num, true); std::vector ui8values; for (uint8_t i = 0; i < reserve_num; i++) ui8values.push_back(i); PARQUET_THROW_NOT_OK(ui8builder.AppendValues(ui8values, ui8validity)); std::shared_ptr ui8array; PARQUET_THROW_NOT_OK(ui8builder.Finish(&ui8array)); // uint64 arrow::UInt64Builder uint64Builder; uint64_t ureserve_num = 30; PARQUET_THROW_NOT_OK(uint64Builder.Reserve(ureserve_num)); std::vector uint64Validity(ureserve_num, true); uint64Validity[1] = 0; uint64Validity[2] = 0; uint64Validity[3] = 0; std::vector uint64Values; for (uint64_t i = 0; i < ureserve_num - 1; i++) uint64Values.push_back(i); uint64Values.push_back(2147483648); PARQUET_THROW_NOT_OK(uint64Builder.AppendValues(uint64Values, uint64Validity)); std::shared_ptr uint64Array; PARQUET_THROW_NOT_OK(uint64Builder.Finish(&uint64Array)); // decimal(38, 10) auto tDType = arrow::Decimal128Type::Make(38, 10); PARQUET_ASSIGN_OR_THROW(auto dType, tDType); arrow::Decimal128Builder d128builder1(dType, arrow::default_memory_pool()); for (int64_t i = 0; i < reserve_num; i++) PARQUET_THROW_NOT_OK(d128builder1.Append(arrow::Decimal128("1234567890987654321.12345678"))); std::shared_ptr decimalArray1; PARQUET_THROW_NOT_OK(d128builder1.Finish(&decimalArray1)); // binary arrow::BinaryBuilder binaryBuilder; PARQUET_THROW_NOT_OK(binaryBuilder.Reserve(reserve_num)); uint8_t binaryValidity[reserve_num]; std::vector binaryValues; for (int32_t i = reserve_num - 1; i >= 0; i--) { binaryValidity[i] = 1; } // 30 different values binaryValues.push_back(std::string("a")); binaryValues.push_back(std::string("ab")); binaryValues.push_back(std::string("abcd")); binaryValues.push_back(std::string("abcde")); binaryValues.push_back(std::string("abcdefg")); binaryValues.push_back(std::string("Whlg1xXAxP")); binaryValues.push_back(std::string("4NimzSQzMD")); binaryValues.push_back(std::string("G23ne3j92Ky0wBF")); binaryValues.push_back(std::string("F4z")); binaryValues.push_back(std::string("8JCVTsGYB7V")); binaryValues.push_back(std::string("23235")); binaryValues.push_back(std::string("sda22")); binaryValues.push_back(std::string("SD7sdFD7")); binaryValues.push_back(std::string("gvv3hYwdfOD")); binaryValues.push_back(std::string("y8wjo4v50s6")); binaryValues.push_back(std::string("aNJW56SJieE8KVV")); binaryValues.push_back(std::string("1+2=3")); binaryValues.push_back(std::string("Hello World!")); binaryValues.push_back(std::string("1!!!1")); binaryValues.push_back(std::string("824407880313877")); binaryValues.push_back(std::string("1970-01-01 08:02:23")); binaryValues.push_back(std::string("1970-05-31")); binaryValues.push_back(std::string("xxx")); binaryValues.push_back(std::string("ONMKMQVBRWBUTWT")); binaryValues.push_back(std::string("ZWMWHSEZDYODQWP")); binaryValues.push_back(std::string("HoCYpJ")); binaryValues.push_back(std::string("-100")); binaryValues.push_back(std::string("Iqa8Nr")); binaryValues.push_back(std::string("nD274v")); binaryValues.push_back(std::string("6y0JyW")); PARQUET_THROW_NOT_OK(binaryBuilder.AppendValues(binaryValues, binaryValidity)); std::shared_ptr binaryArray; PARQUET_THROW_NOT_OK(binaryBuilder.Finish(&binaryArray)); // fixed_size_binary_array auto tfixedSizeType = arrow::FixedSizeBinaryType::Make(4); PARQUET_ASSIGN_OR_THROW(auto fixedSizeType, tfixedSizeType); arrow::FixedSizeBinaryBuilder fixedSizeBuilder(fixedSizeType); for (uint64_t i = 0; i < reserve_num; i++) PARQUET_THROW_NOT_OK(fixedSizeBuilder.Append("abcd")); std::shared_ptr fixedSizeArray; PARQUET_THROW_NOT_OK(fixedSizeBuilder.Finish(&fixedSizeArray)); // make schema // 28 cols std::shared_ptr schema = arrow::schema({arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::int64()), arrow::field("col3", arrow::float32()), arrow::field("col4", arrow::float64()), arrow::field("col5", arrow::time32(arrow::TimeUnit::MILLI)), arrow::field("col6", arrow::utf8()), arrow::field("col7", arrow::utf8()), arrow::field("col8", arrow::utf8()), arrow::field("col9", arrow::utf8()), arrow::field("col10", arrow::utf8()), arrow::field("col11", arrow::utf8()), arrow::field("col12", arrow::timestamp(arrow::TimeUnit::MILLI)), arrow::field("col13", arrow::date32()), arrow::field("col14", arrow::timestamp(arrow::TimeUnit::MILLI)), arrow::field("col15", arrow::int16()), arrow::field("col16", arrow::int8()), arrow::field("col17", arrow::decimal128(9, 3)), arrow::field("col18", arrow::uint32()), arrow::field("col19", arrow::uint16()), arrow::field("col20", arrow::uint8()), arrow::field("col21", arrow::uint64()), arrow::field("col22", arrow::boolean()), arrow::field("col23", arrow::decimal128(38, 10)), arrow::field("col24", arrow::time64(arrow::TimeUnit::MICRO)), arrow::field("col25", arrow::timestamp(arrow::TimeUnit::MICRO)), arrow::field("col26", arrow::timestamp(arrow::TimeUnit::MICRO)), arrow::field("col27", arrow::binary()), arrow::field("col28", arrow::fixed_size_binary(4))}); std::shared_ptr table = arrow::Table::Make( schema, {int32Array, int64Array, floatArray, doubleArray, time32array, strarray, strarray, strarray, strarray, strarray, strarray, tsarray, date32array, tsarray, i16array, i8array, decimalArray, uint32Array, ui16array, ui8array, uint64Array, boolArray, decimalArray1, time64array, tsarray1, tsarray1, binaryArray, fixedSizeArray}); // write to file arrow::MemoryPool* pool = arrow::default_memory_pool(); std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/tests.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table, pool, outfile, 1000)); PARQUET_THROW_NOT_OK(outfile->Close()); // null file arrow::NullBuilder nullBuilder; PARQUET_THROW_NOT_OK(nullBuilder.Reserve(reserve_num)); PARQUET_THROW_NOT_OK(nullBuilder.AppendNulls(reserve_num)); std::shared_ptr nullarray; PARQUET_THROW_NOT_OK(nullBuilder.Finish(&nullarray)); std::shared_ptr schema1 = arrow::schema({arrow::field("col1", arrow::null()), arrow::field("col2", arrow::null()), arrow::field("col3", arrow::null()), arrow::field("col4", arrow::null()), arrow::field("col5", arrow::null()), arrow::field("col6", arrow::null()), arrow::field("col7", arrow::null()), arrow::field("col8", arrow::utf8()), arrow::field("col9", arrow::null()), arrow::field("col10", arrow::null()), arrow::field("col11", arrow::utf8()), arrow::field("col12", arrow::timestamp(arrow::TimeUnit::MILLI)), arrow::field("col13", arrow::null()), arrow::field("col14", arrow::null()), arrow::field("col15", arrow::null()), arrow::field("col16", arrow::null()), arrow::field("col17", arrow::null()), arrow::field("col18", arrow::null()), arrow::field("col19", arrow::null()), arrow::field("col20", arrow::null()), arrow::field("col21", arrow::null()), arrow::field("col22", arrow::null()), arrow::field("col23", arrow::null()), arrow::field("col24", arrow::null()), arrow::field("col25", arrow::timestamp(arrow::TimeUnit::MICRO)), arrow::field("col26", arrow::timestamp(arrow::TimeUnit::MILLI)), arrow::field("col27", arrow::null()), arrow::field("col28", arrow::null())}); std::shared_ptr table1 = arrow::Table::Make( schema1, {nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, strarray, nullarray, nullarray, strarray, tsarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, nullarray, tsarray1, tsarray, nullarray, nullarray}); std::shared_ptr outfile1; PARQUET_ASSIGN_OR_THROW(outfile1, arrow::io::FileOutputStream::Open(fileDir + "/nulls.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(*table1, pool, outfile1, 3)); PARQUET_THROW_NOT_OK(outfile1->Close()); } class LargeDataProducer : public arrow::RecordBatchReader { public: LargeDataProducer(size_t numberOfRowsToProduce, size_t chunkSize) : numberOfRowsToProduce_(numberOfRowsToProduce), chunkSize_(chunkSize) { PARQUET_THROW_NOT_OK(i32builder_.Reserve(chunkSize)); PARQUET_THROW_NOT_OK(tsbuilder_.Reserve(chunkSize)); PARQUET_THROW_NOT_OK(d128builder_.Reserve(chunkSize)); PARQUET_THROW_NOT_OK(strbuilder_.Reserve(chunkSize)); PARQUET_THROW_NOT_OK(doublebuilder_.Reserve(chunkSize)); } virtual std::shared_ptr schema() const override { return arrow::schema( {arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::timestamp(arrow::TimeUnit::MILLI)), arrow::field("col3", arrow::utf8()), arrow::field("col4", arrow::decimal128(38, 10)), arrow::field("col5", arrow::float64()), arrow::field("col6", arrow::utf8())}); } virtual arrow::Result ReadNext() override { return arrow::Status::NotImplemented("ReadNext with custom metadata"); } virtual arrow::Status ReadNext(std::shared_ptr* batch) override { if (restChunkSize() == 0) { batch = nullptr; } else { *batch = arrow::RecordBatch::Make(schema(), restChunkSize(), {ProduceInts32(), ProduceTimeStamps(), ProduceStrings(), ProduceDecimals(), ProduceDoubles(), ProduceStrings()}); } rowsProduced_ += restChunkSize(); return arrow::Status::OK(); } private: size_t restChunkSize() { return std::min(chunkSize_, numberOfRowsToProduce_ - rowsProduced_); } std::shared_ptr ProduceInts32() { size_t num = restChunkSize(); std::vector validity(num, true); std::vector values; values.reserve(num); for (int32_t i = (int32_t)rowsProduced_; i < (int32_t)(rowsProduced_ + num); i++) values.push_back(i); PARQUET_THROW_NOT_OK(i32builder_.AppendValues(values, validity)); std::shared_ptr array; PARQUET_THROW_NOT_OK(i32builder_.Finish(&array)); i32builder_.Reset(); return array; } std::shared_ptr ProduceDoubles() { size_t num = restChunkSize(); std::vector dvalidity(num, true); std::vector dvalues; for (size_t i = rowsProduced_; i < rowsProduced_ + num; i++) dvalues.push_back(i + 2.5); PARQUET_THROW_NOT_OK(doublebuilder_.AppendValues(dvalues, dvalidity)); std::shared_ptr doublearray; PARQUET_THROW_NOT_OK(doublebuilder_.Finish(&doublearray)); doublebuilder_.Reset(); return doublearray; } std::shared_ptr ProduceStrings() { size_t num = restChunkSize(); std::vector values; for (size_t i = 0; i < num ; i++) { values.push_back(std::string("hhhh")); } PARQUET_THROW_NOT_OK(strbuilder_.AppendValues(values)); std::shared_ptr strarray; PARQUET_THROW_NOT_OK(strbuilder_.Finish(&strarray)); strbuilder_.Reset(); return strarray; } std::shared_ptr ProduceTimeStamps() { size_t num = restChunkSize(); std::vector tsvalidity(num, true); std::vector tsvalues; for (int64_t i = 0; i < int64_t(num); i++) tsvalues.push_back(i * 10000001); PARQUET_THROW_NOT_OK(tsbuilder_.AppendValues(tsvalues, tsvalidity)); std::shared_ptr tsarray; PARQUET_THROW_NOT_OK(tsbuilder_.Finish(&tsarray)); tsbuilder_.Reset(); return tsarray; } std::shared_ptr ProduceDecimals() { size_t num = restChunkSize(); for (size_t i = 0; i < num; i++) PARQUET_THROW_NOT_OK(d128builder_.Append(arrow::Decimal128("1234567890987654321.12345678"))); std::shared_ptr decimalArray; PARQUET_THROW_NOT_OK(d128builder_.Finish(&decimalArray)); d128builder_.Reset(); return decimalArray; } private: arrow::Int32Builder i32builder_; arrow::DoubleBuilder doublebuilder_; arrow::StringBuilder strbuilder_; arrow::TimestampBuilder tsbuilder_{arrow::timestamp(arrow::TimeUnit::MILLI), arrow::default_memory_pool()}; arrow::Decimal128Builder d128builder_{arrow::Decimal128Type::Make(38, 10).ValueOrDie()}; size_t numberOfRowsToProduce_ = 100; size_t chunkSize_ = 10; size_t rowsProduced_ = 0; }; void generateLargeTable(size_t reserve_num, std::string rowNum, std::string fileDir) { // Data is in RBR std::shared_ptr batch_stream = std::make_shared(reserve_num, 1000000); // Choose compression std::shared_ptr props = parquet::WriterProperties::Builder().build(); // Opt to store Arrow schema for easier reads back into Arrow std::shared_ptr arrow_props = parquet::ArrowWriterProperties::Builder().build(); // Create a writer std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(fileDir + "/" + rowNum + "MRows.parquet")); std::unique_ptr writer; PARQUET_ASSIGN_OR_THROW( writer, parquet::arrow::FileWriter::Open(*batch_stream->schema().get(), arrow::default_memory_pool(), outfile, props, arrow_props)); // Write each batch as a row_group for (arrow::Result> maybe_batch : *batch_stream) { PARQUET_ASSIGN_OR_THROW(auto batch, maybe_batch); PARQUET_ASSIGN_OR_THROW(auto table, arrow::Table::FromRecordBatches(batch->schema(), {batch})); PARQUET_THROW_NOT_OK(writer->WriteTable(*table.get(), batch->num_rows())); } // Write file footer and close PARQUET_THROW_NOT_OK(writer->Close()); } int main(int argc, char** argv) { int32_t option; std::string pname(argv[0]); std::string fileDir = "."; // default is current working directory bool genTable = false; bool genAllTable = false; bool genLargeTable = false; bool genNullTable = false; bool genStrTable = false; bool genDecTable = false; bool genIntTable = false; while ((option = getopt(argc, argv, "abcdf:lsih")) != EOF) { switch (option) { case 'd': { genTable = true; break; } case 'a': { genAllTable = true; break; } case 'l': { genLargeTable = true; break; } case 'b': { genNullTable = true; break; } case 's': { genStrTable = true; break; } case 'c': { genDecTable = true; break; } case 'i': { genIntTable = true; break; } case 'f': { fileDir = std::string(optarg); break; } case 'h': case '?': default: usage(pname); return (option == 'h' ? 0 : -1); break; } } if (genTable) { generateTable(fileDir); } if (genAllTable) { generateAllTable(fileDir); } if (genLargeTable) { generateLargeTable(1000000, "1", fileDir); generateLargeTable(10000000, "10", fileDir); generateLargeTable(50000000, "50", fileDir); generateLargeTable(100000000, "100", fileDir); } if (genNullTable) { generateNullTable(fileDir); } if (genStrTable) { generateStringTable(fileDir); } if (genDecTable) { generateDecimalTable(fileDir); } if (genIntTable) { generateIntTable(fileDir); } if (argc == 1) { usage(pname); } return 0; }