/* Copyright (C) 2017 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id$ * * mcsv1_UDAF.h ***********************************************************************/ /** * Columnstore interface for writing a User Defined Aggregate * Functions (UDAF) and User Defined Analytic Functions (UDAnF) * or a function that can act as either - UDA(n)F * * The basic steps are: * * 1. Create a the UDA(n)F function interface in some .h file. * 2. Create the UDF function implementation in some .cpp file * 3. Create the connector stub (MariaDB UDAF definition) for * this UDF function. * 4. build the dynamic library using all of the source. * 5 Put the library in $COLUMNSTORE_INSTALL/lib of * all modules * 6. restart the Columnstore system. * 7. notify mysqld about the new function: * * CREATE AGGREGATE FUNCTION median returns REAL soname * 'libudf_mysql.so'; * * The UDAF functions may run distributed in the Columnstore * engine. UDAnF do not run distributed. * * UDAF is User Defined Aggregate Function. * UDAnF is User Defined Analytic Function. * UDA(n)F is an acronym for a function that could be either. It * is also used to describe the interface that is used for * either. */ #pragma once #include #include #include #include #include "mcsv1_udaf.h" #include "calpontsystemcatalog.h" #include "windowfunctioncolumn.h" #define EXPORT namespace mcsv1sdk { #define DATATYPE double typedef std::map MEDIAN_DATA; // Override UserData for data storage struct MedianData : public UserData { MedianData() = default; ~MedianData() override = default; void serialize(messageqcpp::ByteStream& bs) const override; void unserialize(messageqcpp::ByteStream& bs) override; MEDIAN_DATA mData; private: // For now, copy construction is unwanted explicit MedianData(UserData&); }; // Override mcsv1_UDAF to build your User Defined Aggregate (UDAF) and/or // User Defined Analytic Function (UDAnF). // These will be singleton classes, so don't put any instance // specific data in here. All instance data is stored in mcsv1Context // passed to each user function and retrieved by the getUserData() method. // // Each API function returns a ReturnCode. If ERROR is returned at any time, // the query is aborted, getInterrupted() will begin to return true and the // message set in config->setErrorMessage() is returned to MariaDB. // Return the median value of the dataset class median : public mcsv1_UDAF { public: // Defaults OK median() : mcsv1_UDAF(){}; ~median() override = default; /** * init() * * Mandatory. Implement this to initialize flags and instance * data. Called once per SQL statement. You can do any sanity * checks here. * * colTypes (in) - A vector of ColDataType defining the * parameters of the UDA(n)F call. These can be used to decide * to override the default return type. If desired, the new * return type can be set by context->setReturnType() and * decimal scale and precision can be set by context->setScale * and context->setPrecision respectively. * * Return mcsv1_UDAF::ERROR on any error, such as non-compatible * colTypes or wrong number of arguments. Else return * mcsv1_UDAF::SUCCESS. */ ReturnCode init(mcsv1Context* context, ColumnDatum* colTypes) override; /** * reset() * * Mandatory. Reset the UDA(n)F for a new group, partition or, * in some cases, new Window Frame. Do not free any memory * allocated by context->setUserDataSize(). The SDK Framework owns * that memory and will handle that. Use this opportunity to * reset any variables in context->getUserData() needed for the * next aggregation. May be called multiple times if running in * a ditributed fashion. * * Use this opportunity to initialize the userData. */ ReturnCode reset(mcsv1Context* context) override; /** * nextValue() * * Mandatory. Handle a single row. * * colsIn - A vector of data structure describing the input * data. * * This function is called once for every row in the filtered * result set (before aggregation). It is very important that * this function is efficient. * * If the UDAF is running in a distributed fashion, nextValue * cannot depend on order, as it will only be called for each * row found on the specific PM. * * valsIn (in) - a vector of the parameters from the row. */ ReturnCode nextValue(mcsv1Context* context, ColumnDatum* valsIn) override; /** * subEvaluate() * * Mandatory -- Called if the UDAF is running in a distributed * fashion. Columnstore tries to run all aggregate functions * distributed, depending on context. * * Perform an aggregation on rows partially aggregated by * nextValue. Columnstore calls nextValue for each row on a * given PM for a group (GROUP BY). subEvaluate is called on the * UM to consolodate those values into a single instance of * userData. Keep your aggregated totals in context's userData. * The first time this is called for a group, reset() would have * been called with this version of userData. * * Called for every partial data set in each group in GROUP BY. * * When subEvaluate has been called for all subAggregated data * sets, Evaluate will be called with the same context as here. * * valIn (In) - This is a pointer to a memory block of the size * set in setUserDataSize. It will contain the value of userData * as seen in the last call to NextValue for a given PM. * */ ReturnCode subEvaluate(mcsv1Context* context, const UserData* valIn) override; /** * evaluate() * * Mandatory. Get the aggregated value. * * Called for every new group if UDAF GROUP BY, UDAnF partition * or, in some cases, new Window Frame. * * Set the aggregated value into valOut. The datatype is assumed * to be the same as that set in the init() function; * * If the UDAF is running in a distributed fashion, evaluate is * called after a series of subEvaluate calls. * * valOut (out) - Set the aggregated value here. The datatype is * assumed to be the same as that set in the init() function; * * To return a NULL value, don't assign to valOut. */ ReturnCode evaluate(mcsv1Context* context, static_any::any& valOut) override; /** * dropValue() * * Optional -- If defined, the server will call this instead of * reset for UDAnF. * * Don't implement if a UDAnF has one or more of the following: * The UDAnF can't be used with a Window Frame * The UDAnF is not reversable in some way * The UDAnF is not interested in optimal performance * * If not implemented, reset() followed by a series of * nextValue() will be called for each movement of the Window * Frame. * * If implemented, then each movement of the Window Frame will * result in dropValue() being called for each row falling out * of the Frame and nextValue() being called for each new row * coming into the Frame. * * valsDropped (in) - a vector of the parameters from the row * leaving the Frame * * dropValue() will not be called for unbounded/current row type * frames, as those are already optimized. */ ReturnCode dropValue(mcsv1Context* context, ColumnDatum* valsDropped) override; /** * createUserData() * * Optional -- If defined, the server will call this instead of * createUserData on context. * * Create your variable length data structure via * data = new * * The data structure may contain references to containers or * pointers to other objects. Remember that for distributed * processing, this may be called multiple times for variaous * computing blocks. At the least, it will be called once per PM * that processes the data, and once more for the UM. For UDAnF, * it may only be called once. * * Set length to the length of the data structure you create. * * For each call to createUserData(), there will be a * corresponding deleteUserData() where you must clean up. Any * memory leaks are your fault. * */ ReturnCode createUserData(UserData*& data, int32_t& length) override; protected: }; }; // namespace mcsv1sdk #undef EXPORT