Streaming execution engine

Warning

The streaming execution engine is experimental, and a stable API is not yet guaranteed.

Motivation

For many complex computations, successive direct invocation of compute functions is not feasible in either memory or computation time. Doing so causes all intermediate data to be fully materialized. To facilitate arbitrarily large inputs and more efficient resource usage, Arrow also provides a streaming query engine with which computations can be formulated and executed.

An example graph of a streaming execution workflow.

ExecNode is provided to reify the graph of operations in a query. Batches of data (ExecBatch) flow along edges of the graph from node to node. Structuring the API around streams of batches allows the working set for each node to be tuned for optimal performance independent of any other nodes in the graph. Each ExecNode processes batches as they are pushed to it along an edge of the graph by upstream nodes (its inputs), and pushes batches along an edge of the graph to downstream nodes (its outputs) as they are finalized.

Overview

ExecNode

Each node in the graph is an implementation of the ExecNode interface.

ExecPlan

A set of ExecNode is contained and (to an extent) coordinated by an ExecPlan.

ExecFactoryRegistry

Instances of ExecNode are constructed by factory functions held in a ExecFactoryRegistry.

ExecNodeOptions

Heterogenous parameters for factories of ExecNode are bundled in an ExecNodeOptions.

Declaration

dplyr-inspired helper for efficient construction of an ExecPlan.

ExecBatch

A lightweight container for a single chunk of data in the Arrow format. In contrast to RecordBatch, ExecBatch is intended for use exclusively in a streaming execution context (for example, it doesn’t have a corresponding Python binding). Furthermore columns which happen to have a constant value may be represented by a Scalar instead of an Array. In addition, ExecBatch may carry execution-relevant properties including a guaranteed-true-filter for Expression simplification.

An example ExecNode implementation which simply passes all input batches through unchanged:

class PassthruNode : public ExecNode {
 public:
  // InputReceived is the main entry point for ExecNodes. It is invoked
  // by an input of this node to push a batch here for processing.
  void InputReceived(ExecNode* input, ExecBatch batch) override {
    // Since this is a passthru node we simply push the batch to our
    // only output here.
    outputs_[0]->InputReceived(this, batch);
  }

  // ErrorReceived is called by an input of this node to report an error.
  // ExecNodes should always forward errors to their outputs unless they
  // are able to fully handle the error (this is rare).
  void ErrorReceived(ExecNode* input, Status error) override {
    outputs_[0]->ErrorReceived(this, error);
  }

  // InputFinished is used to signal how many batches will ultimately arrive.
  // It may be called with any ordering relative to InputReceived/ErrorReceived.
  void InputFinished(ExecNode* input, int total_batches) override {
    outputs_[0]->InputFinished(this, total_batches);
  }

  // ExecNodes may request that their inputs throttle production of batches
  // until they are ready for more, or stop production if no further batches
  // are required.  These signals should typically be forwarded to the inputs
  // of the ExecNode.
  void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); }
  void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); }
  void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); }

  // An ExecNode has a single output schema to which all its batches conform.
  using ExecNode::output_schema;

  // ExecNodes carry basic introspection for debugging purposes
  const char* kind_name() const override { return "PassthruNode"; }
  using ExecNode::label;
  using ExecNode::SetLabel;
  using ExecNode::ToString;

  // An ExecNode holds references to its inputs and outputs, so it is possible
  // to walk the graph of execution if necessary.
  using ExecNode::inputs;
  using ExecNode::outputs;

  // StartProducing() and StopProducing() are invoked by an ExecPlan to
  // coordinate the graph-wide execution state.  These do not need to be
  // forwarded to inputs or outputs.
  Status StartProducing() override { return Status::OK(); }
  void StopProducing() override {}
  Future<> finished() override { return inputs_[0]->finished(); }
};

Note that each method which is associated with an edge of the graph must be invoked with an ExecNode* to identify the node which invoked it. For example, in an ExecNode which implements JOIN this tagging might be used to differentiate between batches from the left or right inputs. InputReceived, ErrorReceived, InputFinished may only be invoked by the inputs of a node, while ResumeProducing, PauseProducing, StopProducing may only be invoked by outputs of a node.

ExecPlan contains the associated instances of ExecNode and is used to start and stop execution of all nodes and for querying/awaiting their completion:

// construct an ExecPlan first to hold your nodes
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context()));

// ... add nodes to your ExecPlan

// start all nodes in the graph
ARROW_RETURN_NOT_OK(plan->StartProducing());

SetUserCancellationCallback([plan] {
  // stop all nodes in the graph
  plan->StopProducing();
});

// Complete will be marked finished when all nodes have run to completion
// or acknowledged a StopProducing() signal. The ExecPlan should be kept
// alive until this future is marked finished.
Future<> complete = plan->finished();

Constructing ExecPlan objects

Warning

The following will be superceded by construction from Compute IR, see ARROW-14074.

None of the concrete implementations of ExecNode are exposed in headers, so they can’t be constructed directly outside the translation unit where they are defined. Instead, factories to create them are provided in an extensible registry. This structure provides a number of benefits:

  • This enforces consistent construction.

  • It decouples implementations from consumers of the interface (for example: we have two classes for scalar and grouped aggregate, we can choose which to construct within the single factory by checking whether grouping keys are provided)

  • This expedites integration with out-of-library extensions. For example “scan” nodes are implemented in the separate libarrow_dataset.so library.

  • Since the class is not referencable outside the translation unit in which it is defined, compilers can optimize more aggressively.

Factories of ExecNode can be retrieved by name from the registry. The default registry is available through arrow::compute::default_exec_factory_registry() and can be queried for the built-in factories:

// get the factory for "filter" nodes:
ARROW_ASSIGN_OR_RAISE(auto make_filter,
                      default_exec_factory_registry()->GetFactory("filter"));

// factories take three arguments:
ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter(
    // the ExecPlan which should own this node
    plan.get(),

    // nodes which will send batches to this node (inputs)
    {scan_node},

    // parameters unique to "filter" nodes
    FilterNodeOptions{filter_expression}));

// alternative shorthand:
ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter",
    plan.get(), {scan_node}, FilterNodeOptions{filter_expression});

Factories can also be added to the default registry as long as they are convertible to std::function<Result<ExecNode*>( ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>.

To build an ExecPlan representing a simple pipeline which reads from a RecordBatchReader then filters, projects, and writes to disk:

std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ExecNode* source_node = *MakeExecNode("source", plan.get(), {},
                                      SourceNodeOptions::FromReader(
                                          reader,
                                          GetCpuThreadPool()));

ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node},
                                      FilterNodeOptions{
                                        greater(field_ref("score"), literal(3))
                                      });

ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node},
                                       ProjectNodeOptions{
                                         {add(field_ref("score"), literal(1))},
                                         {"score + 1"}
                                       });

arrow::dataset::internal::Initialize();
MakeExecNode("write", plan.get(), {project_node},
             WriteNodeOptions{/*base_dir=*/"/dat", /*...*/});

Declaration is a dplyr-inspired helper which further decreases the boilerplate associated with populating an ExecPlan from C++:

arrow::dataset::internal::Initialize();

std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
ASSERT_OK(Declaration::Sequence(
              {
                  {"source", SourceNodeOptions::FromReader(
                       reader,
                       GetCpuThreadPool())},
                  {"filter", FilterNodeOptions{
                       greater(field_ref("score"), literal(3))}},
                  {"project", ProjectNodeOptions{
                       {add(field_ref("score"), literal(1))},
                       {"score + 1"}}},
                  {"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}},
              })
              .AddToPlan(plan.get()));

Note that a source node can wrap anything which resembles a stream of batches. For example, PR#11032 adds support for use of a DuckDB query as a source node. Similarly, a sink node can wrap anything which absorbs a stream of batches. In the example above we’re writing completed batches to disk. However we can also collect these in memory into a Table or forward them to a RecordBatchReader as an out-of-graph stream. This flexibility allows an ExecPlan to be used as streaming middleware between any endpoints which support Arrow formatted batches.

An arrow::dataset::Dataset can also be wrapped as a source node which pushes all the dataset’s batches into an ExecPlan. This factory is added to the default registry with the name "scan" by calling arrow::dataset::internal::Initialize():

arrow::dataset::internal::Initialize();

std::shared_ptr<Dataset> dataset = GetDataset();

ASSERT_OK(Declaration::Sequence(
              {
                  {"scan", ScanNodeOptions{dataset,
                     /* push down predicate, projection, ... */}},
                  {"filter", FilterNodeOptions{/* ... */}},
                  // ...
              })
              .AddToPlan(plan.get()));

Datasets may be scanned multiple times; just make multiple scan nodes from that dataset. (Useful for a self-join, for example.) Note that producing two scan nodes like this will perform all reads and decodes twice.

Constructing ExecNode using Options

ExecNode is the component we use as a building block containing in-built operations with various functionalities.

This is the list of operations associated with the execution plan:

Operations and Options

Operation

Options

source

arrow::compute::SourceNodeOptions

filter

arrow::compute::FilterNodeOptions

project

arrow::compute::ProjectNodeOptions

aggregate

arrow::compute::AggregateNodeOptions

sink

arrow::compute::SinkNodeOptions

consuming_sink

arrow::compute::ConsumingSinkNodeOptions

order_by_sink

arrow::compute::OrderBySinkNodeOptions

select_k_sink

arrow::compute::SelectKSinkNodeOptions

scan

arrow::dataset::ScanNodeOptions

hash_join

arrow::compute::HashJoinNodeOptions

write

arrow::dataset::WriteNodeOptions

union

N/A

source

A source operation can be considered as an entry point to create a streaming execution plan. arrow::compute::SourceNodeOptions are used to create the source operation. The source operation is the most generic and flexible type of source currently available but it can be quite tricky to configure. To process data from files the scan operation is likely a simpler choice.

The source node requires some kind of function that can be called to poll for more data. This function should take no arguments and should return an arrow::Future<std::shared_ptr<arrow::util::optional<arrow::RecordBatch>>>. This function might be reading a file, iterating through an in memory structure, or receiving data from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator and there are a number of utilities for working with these functions. For this example we use a vector of record batches that we’ve already stored in memory. In addition, the schema of the data must be known up front. Arrow’s streaming execution engine must know the schema of the data at each stage of the execution graph before any processing has begun. This means we must supply the schema for a source node separately from the data itself.

Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :

155struct BatchesWithSchema {
156  std::vector<cp::ExecBatch> batches;
157  std::shared_ptr<arrow::Schema> schema;
158  // // This method uses internal arrow utilities to
159  // // convert a vector of record batches to an AsyncGenerator of optional batches
160  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() const {
161    auto opt_batches = ::arrow::internal::MapVector(
162        [](cp::ExecBatch batch) { return arrow::util::make_optional(std::move(batch)); },
163        batches);
164    arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen;
165    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
166    return gen;
167  }
168};

Generating sample batches for computation:

172arrow::Result<BatchesWithSchema> MakeBasicBatches() {
173  BatchesWithSchema out;
174  auto field_vector = {arrow::field("a", arrow::int32()),
175                       arrow::field("b", arrow::boolean())};
176  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
177  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
178  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
179
180  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
181                        GetArrayDataSample<arrow::BooleanType>({false, true}));
182  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
183                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
184  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
185                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
186
187  ARROW_ASSIGN_OR_RAISE(auto b1,
188                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
189  ARROW_ASSIGN_OR_RAISE(auto b2,
190                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
191  ARROW_ASSIGN_OR_RAISE(auto b3,
192                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
193
194  out.batches = {b1, b2, b3};
195  out.schema = arrow::schema(field_vector);
196  return out;
197}

Example of using source (usage of sink is explained in detail in sink):

326/**
327 * \brief
328 * Source-Sink Example
329 * This example shows how a source and sink can be used
330 * in an execution plan. This includes source node receiving data
331 * and the sink node emits the data as an output represented in
332 * a table.
333 * \param exec_context : execution context
334 * \return arrow::Status
335 */
336arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
337  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
338                        cp::ExecPlan::Make(&exec_context));
339
340  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
341
342  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
343
344  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
345
346  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
347                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
348
349  ARROW_RETURN_NOT_OK(
350      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
351
352  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
353}

filter

filter operation, as the name suggests, provides an option to define data filtering criteria. It selects rows matching a given expression. Filters can be written using arrow::compute::Expression. For example, if we wish to keep rows where the value of column b is greater than 3, then we can use the following expression.

Filter example:

357/**
358 * \brief
359 * Source-Filter-Sink
360 * This example shows how a filter can be used in an execution plan,
361 * along with the source and sink operations. The output from the
362 * exeuction plan is obtained as a table via the sink node.
363 * \param exec_context : execution context
364 * \return arrow::Status
365 */
366arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
367  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
368                        cp::ExecPlan::Make(&exec_context));
369
370  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
371
372  auto options = std::make_shared<arrow::dataset::ScanOptions>();
373  // // specify the filter.  This filter removes all rows where the
374  // value of the "a" column is greater than 3.
375  cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
376  // set filter for scanner : on-disk / push-down filtering.
377  // This step can be skipped if you are not reading from disk.
378  options->filter = filter_opt;
379  // empty projection
380  options->projection = cp::project({}, {});
381
382  // construct the scan node
383  std::cout << "Initialized Scanning Options" << std::endl;
384
385  cp::ExecNode* scan;
386
387  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
388  std::cout << "Scan node options created" << std::endl;
389
390  ARROW_ASSIGN_OR_RAISE(scan,
391                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
392
393  // pipe the scan node into a filter node
394  // // Need to set the filter in scan node options and filter node options.
395  // // At scan node it is used for on-disk / push-down filtering.
396  // // At filter node it is used for in-memory filtering.
397  cp::ExecNode* filter;
398  ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
399                                                 cp::FilterNodeOptions{filter_opt}));
400
401  // // finally, pipe the filter node into a sink node
402  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
403  ARROW_RETURN_NOT_OK(
404      cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
405
406  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
407}

project

project operation rearranges, deletes, transforms, and creates columns. Each output column is computed by evaluating an expression against the source record batch. This is exposed via arrow::compute::ProjectNodeOptions which requires, an arrow::compute::Expression and name for each of the output columns (if names are not provided, the string representations of exprs will be used).

Project example:

412/**
413 * \brief
414 * Scan-Project-Sink
415 * This example shows how Scan operation can be used to load the data
416 * into the execution plan, how project operation can be applied on the
417 * data stream and how the output is obtained as a table via the sink node.
418 *
419 * \param exec_context : execution context
420 * \return arrow::Status
421 */
422arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
423  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
424                        cp::ExecPlan::Make(&exec_context));
425
426  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
427
428  auto options = std::make_shared<arrow::dataset::ScanOptions>();
429  // projection
430  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
431  options->projection = cp::project({}, {});
432
433  cp::ExecNode* scan;
434
435  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
436
437  ARROW_ASSIGN_OR_RAISE(scan,
438                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
439
440  cp::ExecNode* project;
441  ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
442                                                  cp::ProjectNodeOptions{{a_times_2}}));
443  // schema after projection => multiply(a, 2): int64
444  std::cout << "Schema after projection : \n"
445            << project->output_schema()->ToString() << std::endl;
446
447  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
448  ARROW_RETURN_NOT_OK(
449      cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
450  auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
451
452  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
453}

aggregate

The aggregate node computes various types of aggregates over data.

Arrow supports two types of aggregates: “scalar” aggregates, and “hash” aggregates. Scalar aggregates reduce an array or scalar input to a single scalar output (e.g. computing the mean of a column). Hash aggregates act like GROUP BY in SQL and first partition data based on one or more key columns, then reduce the data in each partition. The aggregate node supports both types of computation, and can compute any number of aggregations at once.

arrow::compute::AggregateNodeOptions is used to define the aggregation criteria. It takes a list of aggregation functions and their options; a list of target fields to aggregate, one per function; and a list of names for the output fields, one per function. Optionally, it takes a list of columns that are used to partition the data, in the case of a hash aggregation. The aggregation functions can be selected from this list of aggregation functions.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

The aggregation can provide results as a group or scalar. For instances, an operation like hash_count provides the counts per each unique record as a grouped result while an operation like sum provides a single record.

Scalar Aggregation example:

458/**
459 * \brief
460 * Source-Aggregation-Sink
461 * This example shows how an aggregation operation can be applied on a
462 * execution plan resulting a scalar output. The source node loads the
463 * data and the aggregation (counting unique types in column 'a')
464 * is applied on this data. The output is obtained from the sink node as a table.
465 * \param exec_context : execution context
466 * \return arrow::Status
467 */
468arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
469  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
470                        cp::ExecPlan::Make(&exec_context));
471
472  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
473
474  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
475
476  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
477
478  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
479                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
480  auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr}},
481                                                    /*targets=*/{"a"},
482                                                    /*names=*/{"sum(a)"}};
483  ARROW_ASSIGN_OR_RAISE(
484      cp::ExecNode * aggregate,
485      cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
486
487  ARROW_RETURN_NOT_OK(
488      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
489  auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
490
491  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
492}

Group Aggregation example:

496/**
497 * \brief
498 * Source-Aggregation-Sink
499 * This example shows how an aggregation operation can be applied on a
500 * execution plan resulting a grouped output. The source node loads the
501 * data and the aggregation (counting unique types in column 'a') is
502 * applied on this data. The output is obtained from the sink node as a table.
503 * \param exec_context : execution context
504 * \return arrow::Status
505 */
506arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
507  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
508                        cp::ExecPlan::Make(&exec_context));
509
510  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
511
512  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
513
514  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
515
516  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
517                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
518  cp::CountOptions options(cp::CountOptions::ONLY_VALID);
519  auto aggregate_options =
520      cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
521                               /*targets=*/{"a"},
522                               /*names=*/{"count(a)"},
523                               /*keys=*/{"b"}};
524  ARROW_ASSIGN_OR_RAISE(
525      cp::ExecNode * aggregate,
526      cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
527
528  ARROW_RETURN_NOT_OK(
529      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
530  auto schema = arrow::schema({
531      arrow::field("count(a)", arrow::int32()),
532      arrow::field("b", arrow::boolean()),
533  });
534
535  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
536}

sink

sink operation provides output and is the final node of a streaming execution definition. arrow::compute::SinkNodeOptions interface is used to pass the required options. Similar to the source operator the sink operator exposes the output with a function that returns a record batch future each time it is called. It is expected the caller will repeatedly call this function until the generator function is exhausted (returns arrow::util::optional::nullopt). If this function is not called often enough then record batches will accumulate in memory. An execution plan should only have one “terminal” node (one sink node). An ExecPlan can terminate early due to cancellation or an error, before the output is fully consumed. However, the plan can be safely destroyed independently of the sink, which will hold the unconsumed batches by exec_plan->finished().

As a part of the Source Example, the Sink operation is also included;

326/**
327 * \brief
328 * Source-Sink Example
329 * This example shows how a source and sink can be used
330 * in an execution plan. This includes source node receiving data
331 * and the sink node emits the data as an output represented in
332 * a table.
333 * \param exec_context : execution context
334 * \return arrow::Status
335 */
336arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
337  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
338                        cp::ExecPlan::Make(&exec_context));
339
340  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
341
342  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
343
344  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
345
346  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
347                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
348
349  ARROW_RETURN_NOT_OK(
350      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
351
352  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
353}

consuming_sink

consuming_sink operator is a sink operation containing consuming operation within the execution plan (i.e. the exec plan should not complete until the consumption has completed). Unlike the sink node this node takes in a callback function that is expected to consume the batch. Once this callback has finished the execution plan will no longer hold any reference to the batch. The consuming function may be called before a previous invocation has completed. If the consuming function does not run quickly enough then many concurrent executions could pile up, blocking the CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks have been completed. Once all batches have been delivered the execution plan will wait for the finish future to complete before marking the execution plan finished. This allows for workflows where the consumption function converts batches into async tasks (this is currently done internally for the dataset write node).

Example:

// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {

    CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
    batches_seen(batches_seen), finish(std::move(finish)) {}
    // Consumption logic can be written here
    arrow::Status Consume(cp::ExecBatch batch) override {
    // data can be consumed in the expected way
    // transfer to another system or just do some work
    // and write to disk
    (*batches_seen)++;
    return arrow::Status::OK();
    }

    arrow::Future<> Finish() override { return finish; }

    std::atomic<uint32_t> *batches_seen;
    arrow::Future<> finish;

};

std::shared_ptr<CustomSinkNodeConsumer> consumer =
        std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);

arrow::compute::ExecNode *consuming_sink;

ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
    {source}, cp::ConsumingSinkNodeOptions(consumer)));

Consuming-Sink example:

540/**
541 * \brief
542 * Source-ConsumingSink
543 * This example shows how the data can be consumed within the execution plan
544 * by using a ConsumingSink node. There is no data output from this execution plan.
545 * \param exec_context : execution context
546 * \return arrow::Status
547 */
548arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
549  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
550                        cp::ExecPlan::Make(&exec_context));
551
552  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
553
554  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
555
556  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
557                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
558
559  std::atomic<uint32_t> batches_seen{0};
560  arrow::Future<> finish = arrow::Future<>::Make();
561  struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
562    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
563        : batches_seen(batches_seen), finish(std::move(finish)) {}
564
565    arrow::Status Consume(cp::ExecBatch batch) override {
566      (*batches_seen)++;
567      return arrow::Status::OK();
568    }
569
570    arrow::Future<> Finish() override { return finish; }
571
572    std::atomic<uint32_t>* batches_seen;
573    arrow::Future<> finish;
574  };
575  std::shared_ptr<CustomSinkNodeConsumer> consumer =
576      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
577
578  cp::ExecNode* consuming_sink;
579
580  ARROW_ASSIGN_OR_RAISE(consuming_sink,
581                        MakeExecNode("consuming_sink", plan.get(), {source},
582                                     cp::ConsumingSinkNodeOptions(consumer)));
583
584  ARROW_RETURN_NOT_OK(consuming_sink->Validate());
585
586  ARROW_RETURN_NOT_OK(plan->Validate());
587  std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
588  // plan start producing
589  ARROW_RETURN_NOT_OK(plan->StartProducing());
590  // Source should finish fairly quickly
591  ARROW_RETURN_NOT_OK(source->finished().status());
592  std::cout << "Source Finished!" << std::endl;
593  // Mark consumption complete, plan should finish
594  finish.MarkFinished(arrow::Status::OK());
595  ARROW_RETURN_NOT_OK(plan->finished().status());
596  return arrow::Status::OK();
597}

order_by_sink

order_by_sink operation is an extension to the sink operation. This operation provides the ability to guarantee the ordering of the stream by providing the arrow::compute::OrderBySinkNodeOptions. Here the arrow::compute::SortOptions are provided to define which columns are used for sorting and whether to sort by ascending or descending values.

Note

This node is a “pipeline breaker” and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

Order-By-Sink example:

601/**
602 * \brief
603 * Source-OrderBySink
604 * In this example, the data enters through the source node
605 * and the data is ordered in the sink node. The order can be
606 * ASCENDING or DESCENDING and it is configurable. The output
607 * is obtained as a table from the sink node.
608 * \param exec_context : execution context
609 * \return arrow::Status
610 */
611arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
612  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
613                        cp::ExecPlan::Make(&exec_context));
614
615  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
616
617  std::cout << "basic data created" << std::endl;
618
619  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
620
621  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
622  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
623                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
624
625  ARROW_RETURN_NOT_OK(cp::MakeExecNode(
626      "order_by_sink", plan.get(), {source},
627      cp::OrderBySinkNodeOptions{
628          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
629
630  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
631}

select_k_sink

select_k_sink option enables selecting the top/bottom K elements, similar to a SQL ORDER BY ... LIMIT K clause. arrow::compute::SelectKOptions which is a defined by using OrderBySinkNode definition. This option returns a sink node that receives inputs and then compute top_k/bottom_k.

Note

This node is a “pipeline breaker” and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.

SelectK example:

683/**
684 * \brief
685 * Source-KSelect
686 * This example shows how K number of elements can be selected
687 * either from the top or bottom. The output node is a modified
688 * sink node where output can be obtained as a table.
689 * \param exec_context : execution context
690 * \return arrow::Status
691 */
692arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
693  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
694  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
695                        cp::ExecPlan::Make(&exec_context));
696  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
697
698  ARROW_ASSIGN_OR_RAISE(
699      cp::ExecNode * source,
700      cp::MakeExecNode("source", plan.get(), {},
701                       cp::SourceNodeOptions{input.schema, input.gen()}));
702
703  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
704
705  ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
706                                       cp::SelectKSinkNodeOptions{options, &sink_gen}));
707
708  auto schema = arrow::schema(
709      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
710
711  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
712}

scan

scan is an operation used to load and process datasets. It should be preferred over the more generic source node when your input is a dataset. The behavior is defined using arrow::dataset::ScanNodeOptions. More information on datasets and the various scan options can be found in Tabular Datasets.

This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.

Scan example:

289/**
290 * \brief
291 * Scan-Sink
292 * This example shows how scan operation can be applied on a dataset.
293 * There are operations that can be applied on the scan (project, filter)
294 * and the input data can be processed. THe output is obtained as a table
295 * via the sink node.
296 * \param exec_context : execution context
297 * \return arrow::Status
298 */
299arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
300  // Execution plan created
301  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
302                        cp::ExecPlan::Make(&exec_context));
303
304  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
305
306  auto options = std::make_shared<arrow::dataset::ScanOptions>();
307  options->projection = cp::project({}, {});  // create empty projection
308
309  // construct the scan node
310  cp::ExecNode* scan;
311  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
312
313  ARROW_ASSIGN_OR_RAISE(scan,
314                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
315
316  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
317
318  ARROW_RETURN_NOT_OK(
319      cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
320
321  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
322}

write

The write node saves query results as a dataset of files in a format like Parquet, Feather, CSV, etc. using the Tabular Datasets functionality in Arrow. The write options are provided via the arrow::dataset::WriteNodeOptions which in turn contains arrow::dataset::FileSystemDatasetWriteOptions. arrow::dataset::FileSystemDatasetWriteOptions provides control over the written dataset, including options like the output directory, file naming scheme, and so on.

Write example:

717/**
718 * \brief
719 * Scan-Filter-Write
720 * This example shows how scan node can be used to load the data
721 * and after processing how it can be written to disk.
722 * \param exec_context : execution context
723 * \param file_path : file saving path
724 * \return arrow::Status
725 */
726arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
727                                     const std::string& file_path) {
728  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
729                        cp::ExecPlan::Make(&exec_context));
730
731  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
732
733  auto options = std::make_shared<arrow::dataset::ScanOptions>();
734  // empty projection
735  options->projection = cp::project({}, {});
736
737  cp::ExecNode* scan;
738
739  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
740
741  ARROW_ASSIGN_OR_RAISE(scan,
742                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
743
744  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
745
746  std::string root_path = "";
747  std::string uri = "file://" + file_path;
748  std::shared_ptr<arrow::fs::FileSystem> filesystem =
749      arrow::fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
750
751  auto base_path = root_path + "/parquet_dataset";
752  // Uncomment the following line, if run repeatedly
753  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
754  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
755
756  // The partition schema determines which fields are part of the partitioning.
757  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
758  // We'll use Hive-style partitioning,
759  // which creates directories with "key=value" pairs.
760
761  auto partitioning =
762      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
763  // We'll write Parquet files.
764  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
765
766  arrow::dataset::FileSystemDatasetWriteOptions write_options;
767  write_options.file_write_options = format->DefaultWriteOptions();
768  write_options.filesystem = filesystem;
769  write_options.base_dir = base_path;
770  write_options.partitioning = partitioning;
771  write_options.basename_template = "part{i}.parquet";
772
773  arrow::dataset::WriteNodeOptions write_node_options{write_options, dataset->schema()};
774
775  ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
776
777  ARROW_RETURN_NOT_OK(plan->Validate());
778  std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
779  // // // start the ExecPlan
780  ARROW_RETURN_NOT_OK(plan->StartProducing());
781  auto future = plan->finished();
782  ARROW_RETURN_NOT_OK(future.status());
783  future.Wait();
784  return arrow::Status::OK();
785}

union

union merges multiple data streams with the same schema into one, similar to a SQL UNION ALL clause.

The following example demonstrates how this can be achieved using two data sources.

Union example:

791/**
792 * \brief
793 * Source-Union-Sink
794 * This example shows how a union operation can be applied on two
795 * data sources. The output is obtained as a table via the sink
796 * node.
797 * \param exec_context : execution context
798 * \return arrow::Status
799 */
800arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
801  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
802
803  std::shared_ptr<cp::ExecPlan> plan = cp::ExecPlan::Make(&exec_context).ValueOrDie();
804  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
805
806  cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
807  cp::Declaration lhs{"source",
808                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
809  lhs.label = "lhs";
810  cp::Declaration rhs{"source",
811                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
812  rhs.label = "rhs";
813  union_node.inputs.emplace_back(lhs);
814  union_node.inputs.emplace_back(rhs);
815
816  cp::CountOptions options(cp::CountOptions::ONLY_VALID);
817  ARROW_ASSIGN_OR_RAISE(
818      auto declr, cp::Declaration::Sequence({
819                                                union_node,
820                                                {"sink", cp::SinkNodeOptions{&sink_gen}},
821                                            })
822                      .AddToPlan(plan.get()));
823
824  ARROW_RETURN_NOT_OK(declr->Validate());
825
826  ARROW_RETURN_NOT_OK(plan->Validate());
827  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
828}

hash_join

hash_join operation provides the relational algebra operation, join using hash-based algorithm. arrow::compute::HashJoinNodeOptions contains the options required in defining a join. The hash_join supports left/right/full semi/anti/outerjoins. Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like “_x” which can be appended as a suffix for column names duplicated in both left and right relations.) can be set via the the join options. Read more on hash-joins.

Hash-Join example:

637/**
638 * \brief
639 * Source-HashJoin-Sink
640 * This example shows how source node gets the data and how a self-join
641 * is applied on the data. The join options are configurable. The output
642 * is obtained as a table via the sink node.
643 * \param exec_context : execution context
644 * \return arrow::Status
645 */
646arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
647  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
648  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
649                        cp::ExecPlan::Make(&exec_context));
650
651  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
652
653  cp::ExecNode* left_source;
654  cp::ExecNode* right_source;
655  for (auto source : {&left_source, &right_source}) {
656    ARROW_ASSIGN_OR_RAISE(*source,
657                          MakeExecNode("source", plan.get(), {},
658                                       cp::SourceNodeOptions{input.schema, input.gen()}));
659  }
660
661  cp::HashJoinNodeOptions join_opts{
662      cp::JoinType::INNER,
663      /*left_keys=*/{"str"},
664      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
665
666  ARROW_ASSIGN_OR_RAISE(
667      auto hashjoin,
668      cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
669
670  ARROW_RETURN_NOT_OK(
671      cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
672  // expected columns i32, str, l_str, r_str
673  auto schema = arrow::schema(
674      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
675       arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
676
677  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
678}

Summary

There are examples of these nodes which can be found in cpp/examples/arrow/execution_plan_documentation_examples.cc in the Arrow source.

Complete Example:

 19#include <arrow/array.h>
 20#include <arrow/builder.h>
 21
 22#include <arrow/compute/api.h>
 23#include <arrow/compute/api_vector.h>
 24#include <arrow/compute/cast.h>
 25#include <arrow/compute/exec/exec_plan.h>
 26
 27#include <arrow/csv/api.h>
 28
 29#include <arrow/dataset/dataset.h>
 30#include <arrow/dataset/file_base.h>
 31#include <arrow/dataset/file_parquet.h>
 32#include <arrow/dataset/plan.h>
 33#include <arrow/dataset/scanner.h>
 34
 35#include <arrow/io/interfaces.h>
 36#include <arrow/io/memory.h>
 37
 38#include <arrow/result.h>
 39#include <arrow/status.h>
 40#include <arrow/table.h>
 41
 42#include <arrow/ipc/api.h>
 43
 44#include <arrow/util/future.h>
 45#include <arrow/util/range.h>
 46#include <arrow/util/thread_pool.h>
 47#include <arrow/util/vector.h>
 48
 49#include <iostream>
 50#include <memory>
 51#include <utility>
 52
 53// Demonstrate various operators in Arrow Streaming Execution Engine
 54
 55namespace cp = ::arrow::compute;
 56
 57constexpr char kSep[] = "******";
 58
 59void PrintBlock(const std::string& msg) {
 60  std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
 61}
 62
 63template <typename TYPE,
 64          typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
 65                                             arrow::is_boolean_type<TYPE>::value |
 66                                             arrow::is_temporal_type<TYPE>::value>::type>
 67arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
 68    const std::vector<typename TYPE::c_type>& values) {
 69  using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
 70  using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
 71  ARROW_BUILDER_TYPE builder;
 72  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 73  std::shared_ptr<ARROW_ARRAY_TYPE> array;
 74  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 75  ARROW_RETURN_NOT_OK(builder.Finish(&array));
 76  return array;
 77}
 78
 79template <class TYPE>
 80arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
 81    const std::vector<std::string>& values) {
 82  using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
 83  using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
 84  ARROW_BUILDER_TYPE builder;
 85  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 86  std::shared_ptr<ARROW_ARRAY_TYPE> array;
 87  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 88  ARROW_RETURN_NOT_OK(builder.Finish(&array));
 89  return array;
 90}
 91
 92arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
 93    const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
 94  std::shared_ptr<arrow::RecordBatch> record_batch;
 95  ARROW_ASSIGN_OR_RAISE(auto struct_result,
 96                        arrow::StructArray::Make(array_vector, field_vector));
 97  return record_batch->FromStructArray(struct_result);
 98}
 99
100/**
101 * \brief Get the Dataset object
102 *  Creating Dataset
103 *  a, b
104    1,null
105    2,true
106    null,true
107    3,false
108    null,true
109    4,false
110    5,null
111    6,false
112    7,false
113    8,true
114 * \return arrow::Result<std::shared_ptr<arrow::dataset::Dataset>>
115 */
116arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
117  auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
118  ARROW_ASSIGN_OR_RAISE(auto int64_array,
119                        GetArrayDataSample<arrow::Int64Type>(
120                            {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
121
122  arrow::BooleanBuilder boolean_builder;
123  std::shared_ptr<arrow::BooleanArray> bool_array;
124
125  std::vector<uint8_t> bool_values = {false, true,  true,  false, true,
126                                      false, false, false, false, true};
127  std::vector<bool> is_valid = {false, true,  true, true, true,
128                                true,  false, true, true, true};
129
130  ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
131
132  ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
133
134  ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
135
136  auto record_batch =
137      arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
138                                              arrow::field("b", arrow::boolean())}),
139                               10, {int64_array, bool_array});
140  ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
141  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
142  return ds;
143}
144
145arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
146    const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
147  std::shared_ptr<arrow::RecordBatch> record_batch;
148  ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
149  cp::ExecBatch batch{*res_batch};
150  return batch;
151}
152
153// (Doc section: BatchesWithSchema Definition)
154struct BatchesWithSchema {
155  std::vector<cp::ExecBatch> batches;
156  std::shared_ptr<arrow::Schema> schema;
157  // // This method uses internal arrow utilities to
158  // // convert a vector of record batches to an AsyncGenerator of optional batches
159  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() const {
160    auto opt_batches = ::arrow::internal::MapVector(
161        [](cp::ExecBatch batch) { return arrow::util::make_optional(std::move(batch)); },
162        batches);
163    arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen;
164    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
165    return gen;
166  }
167};
168// (Doc section: BatchesWithSchema Definition)
169
170// (Doc section: MakeBasicBatches Definition)
171arrow::Result<BatchesWithSchema> MakeBasicBatches() {
172  BatchesWithSchema out;
173  auto field_vector = {arrow::field("a", arrow::int32()),
174                       arrow::field("b", arrow::boolean())};
175  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
176  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
177  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
178
179  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
180                        GetArrayDataSample<arrow::BooleanType>({false, true}));
181  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
182                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
183  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
184                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
185
186  ARROW_ASSIGN_OR_RAISE(auto b1,
187                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
188  ARROW_ASSIGN_OR_RAISE(auto b2,
189                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
190  ARROW_ASSIGN_OR_RAISE(auto b3,
191                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
192
193  out.batches = {b1, b2, b3};
194  out.schema = arrow::schema(field_vector);
195  return out;
196}
197// (Doc section: MakeBasicBatches Definition)
198
199arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
200  BatchesWithSchema out;
201  auto field = arrow::field("a", arrow::int32());
202  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
203  ARROW_ASSIGN_OR_RAISE(auto b2_int,
204                        GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
205  ARROW_ASSIGN_OR_RAISE(auto b3_int,
206                        GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
207  ARROW_ASSIGN_OR_RAISE(auto b4_int,
208                        GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
209  ARROW_ASSIGN_OR_RAISE(auto b5_int,
210                        GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
211  ARROW_ASSIGN_OR_RAISE(auto b6_int,
212                        GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
213  ARROW_ASSIGN_OR_RAISE(auto b7_int,
214                        GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
215  ARROW_ASSIGN_OR_RAISE(auto b8_int,
216                        GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
217
218  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
219  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
220  ARROW_ASSIGN_OR_RAISE(auto b3,
221                        GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
222  ARROW_ASSIGN_OR_RAISE(auto b4,
223                        GetExecBatchFromVectors({field, field, field, field},
224                                                {b4_int, b5_int, b6_int, b7_int}));
225  out.batches = {b1, b2, b3, b4};
226  out.schema = arrow::schema({field});
227  return out;
228}
229
230arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
231  BatchesWithSchema out;
232  auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
233  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
234  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
235  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
236  ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
237                                         {"alpha", "beta", "alpha"}));
238  ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
239                                         {"alpha", "gamma", "alpha"}));
240  ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
241                                         {"gamma", "beta", "alpha"}));
242  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
243  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
244  ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
245  out.batches = {b1, b2, b3};
246
247  size_t batch_count = out.batches.size();
248  for (int repeat = 1; repeat < multiplicity; ++repeat) {
249    for (size_t i = 0; i < batch_count; ++i) {
250      out.batches.push_back(out.batches[i]);
251    }
252  }
253
254  out.schema = arrow::schema(fields);
255  return out;
256}
257
258arrow::Status ExecutePlanAndCollectAsTable(
259    cp::ExecContext& exec_context, std::shared_ptr<cp::ExecPlan> plan,
260    std::shared_ptr<arrow::Schema> schema,
261    arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen) {
262  // // translate sink_gen (async) to sink_reader (sync)
263  std::shared_ptr<arrow::RecordBatchReader> sink_reader =
264      cp::MakeGeneratorReader(schema, std::move(sink_gen), exec_context.memory_pool());
265
266  // validate the ExecPlan
267  ARROW_RETURN_NOT_OK(plan->Validate());
268  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
269  // // start the ExecPlan
270  ARROW_RETURN_NOT_OK(plan->StartProducing());
271
272  // // collect sink_reader into a Table
273  std::shared_ptr<arrow::Table> response_table;
274
275  ARROW_ASSIGN_OR_RAISE(response_table,
276                        arrow::Table::FromRecordBatchReader(sink_reader.get()));
277
278  std::cout << "Results : " << response_table->ToString() << std::endl;
279
280  // // stop producing
281  plan->StopProducing();
282  // // plan mark finished
283  auto future = plan->finished();
284  return future.status();
285}
286
287// (Doc section: Scan Example)
288/**
289 * \brief
290 * Scan-Sink
291 * This example shows how scan operation can be applied on a dataset.
292 * There are operations that can be applied on the scan (project, filter)
293 * and the input data can be processed. THe output is obtained as a table
294 * via the sink node.
295 * \param exec_context : execution context
296 * \return arrow::Status
297 */
298arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
299  // Execution plan created
300  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
301                        cp::ExecPlan::Make(&exec_context));
302
303  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
304
305  auto options = std::make_shared<arrow::dataset::ScanOptions>();
306  options->projection = cp::project({}, {});  // create empty projection
307
308  // construct the scan node
309  cp::ExecNode* scan;
310  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
311
312  ARROW_ASSIGN_OR_RAISE(scan,
313                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
314
315  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
316
317  ARROW_RETURN_NOT_OK(
318      cp::MakeExecNode("sink", plan.get(), {scan}, cp::SinkNodeOptions{&sink_gen}));
319
320  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
321}
322// (Doc section: Scan Example)
323
324// (Doc section: Source Example)
325/**
326 * \brief
327 * Source-Sink Example
328 * This example shows how a source and sink can be used
329 * in an execution plan. This includes source node receiving data
330 * and the sink node emits the data as an output represented in
331 * a table.
332 * \param exec_context : execution context
333 * \return arrow::Status
334 */
335arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
336  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
337                        cp::ExecPlan::Make(&exec_context));
338
339  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
340
341  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
342
343  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
344
345  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
346                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
347
348  ARROW_RETURN_NOT_OK(
349      cp::MakeExecNode("sink", plan.get(), {source}, cp::SinkNodeOptions{&sink_gen}));
350
351  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
352}
353// (Doc section: Source Example)
354
355// (Doc section: Filter Example)
356/**
357 * \brief
358 * Source-Filter-Sink
359 * This example shows how a filter can be used in an execution plan,
360 * along with the source and sink operations. The output from the
361 * exeuction plan is obtained as a table via the sink node.
362 * \param exec_context : execution context
363 * \return arrow::Status
364 */
365arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
366  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
367                        cp::ExecPlan::Make(&exec_context));
368
369  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
370
371  auto options = std::make_shared<arrow::dataset::ScanOptions>();
372  // // specify the filter.  This filter removes all rows where the
373  // value of the "a" column is greater than 3.
374  cp::Expression filter_opt = cp::greater(cp::field_ref("a"), cp::literal(3));
375  // set filter for scanner : on-disk / push-down filtering.
376  // This step can be skipped if you are not reading from disk.
377  options->filter = filter_opt;
378  // empty projection
379  options->projection = cp::project({}, {});
380
381  // construct the scan node
382  std::cout << "Initialized Scanning Options" << std::endl;
383
384  cp::ExecNode* scan;
385
386  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
387  std::cout << "Scan node options created" << std::endl;
388
389  ARROW_ASSIGN_OR_RAISE(scan,
390                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
391
392  // pipe the scan node into a filter node
393  // // Need to set the filter in scan node options and filter node options.
394  // // At scan node it is used for on-disk / push-down filtering.
395  // // At filter node it is used for in-memory filtering.
396  cp::ExecNode* filter;
397  ARROW_ASSIGN_OR_RAISE(filter, cp::MakeExecNode("filter", plan.get(), {scan},
398                                                 cp::FilterNodeOptions{filter_opt}));
399
400  // // finally, pipe the filter node into a sink node
401  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
402  ARROW_RETURN_NOT_OK(
403      cp::MakeExecNode("sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen}));
404
405  return ExecutePlanAndCollectAsTable(exec_context, plan, dataset->schema(), sink_gen);
406}
407
408// (Doc section: Filter Example)
409
410// (Doc section: Project Example)
411/**
412 * \brief
413 * Scan-Project-Sink
414 * This example shows how Scan operation can be used to load the data
415 * into the execution plan, how project operation can be applied on the
416 * data stream and how the output is obtained as a table via the sink node.
417 *
418 * \param exec_context : execution context
419 * \return arrow::Status
420 */
421arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
422  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
423                        cp::ExecPlan::Make(&exec_context));
424
425  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
426
427  auto options = std::make_shared<arrow::dataset::ScanOptions>();
428  // projection
429  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
430  options->projection = cp::project({}, {});
431
432  cp::ExecNode* scan;
433
434  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
435
436  ARROW_ASSIGN_OR_RAISE(scan,
437                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
438
439  cp::ExecNode* project;
440  ARROW_ASSIGN_OR_RAISE(project, cp::MakeExecNode("project", plan.get(), {scan},
441                                                  cp::ProjectNodeOptions{{a_times_2}}));
442  // schema after projection => multiply(a, 2): int64
443  std::cout << "Schema after projection : \n"
444            << project->output_schema()->ToString() << std::endl;
445
446  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
447  ARROW_RETURN_NOT_OK(
448      cp::MakeExecNode("sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen}));
449  auto schema = arrow::schema({arrow::field("a * 2", arrow::int32())});
450
451  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
452}
453
454// (Doc section: Project Example)
455
456// (Doc section: Scalar Aggregate Example)
457/**
458 * \brief
459 * Source-Aggregation-Sink
460 * This example shows how an aggregation operation can be applied on a
461 * execution plan resulting a scalar output. The source node loads the
462 * data and the aggregation (counting unique types in column 'a')
463 * is applied on this data. The output is obtained from the sink node as a table.
464 * \param exec_context : execution context
465 * \return arrow::Status
466 */
467arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
468  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
469                        cp::ExecPlan::Make(&exec_context));
470
471  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
472
473  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
474
475  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
476
477  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
478                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
479  auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr}},
480                                                    /*targets=*/{"a"},
481                                                    /*names=*/{"sum(a)"}};
482  ARROW_ASSIGN_OR_RAISE(
483      cp::ExecNode * aggregate,
484      cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
485
486  ARROW_RETURN_NOT_OK(
487      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
488  auto schema = arrow::schema({arrow::field("sum(a)", arrow::int32())});
489
490  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
491}
492// (Doc section: Scalar Aggregate Example)
493
494// (Doc section: Group Aggregate Example)
495/**
496 * \brief
497 * Source-Aggregation-Sink
498 * This example shows how an aggregation operation can be applied on a
499 * execution plan resulting a grouped output. The source node loads the
500 * data and the aggregation (counting unique types in column 'a') is
501 * applied on this data. The output is obtained from the sink node as a table.
502 * \param exec_context : execution context
503 * \return arrow::Status
504 */
505arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
506  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
507                        cp::ExecPlan::Make(&exec_context));
508
509  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
510
511  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
512
513  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
514
515  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
516                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
517  cp::CountOptions options(cp::CountOptions::ONLY_VALID);
518  auto aggregate_options =
519      cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", &options}},
520                               /*targets=*/{"a"},
521                               /*names=*/{"count(a)"},
522                               /*keys=*/{"b"}};
523  ARROW_ASSIGN_OR_RAISE(
524      cp::ExecNode * aggregate,
525      cp::MakeExecNode("aggregate", plan.get(), {source}, aggregate_options));
526
527  ARROW_RETURN_NOT_OK(
528      cp::MakeExecNode("sink", plan.get(), {aggregate}, cp::SinkNodeOptions{&sink_gen}));
529  auto schema = arrow::schema({
530      arrow::field("count(a)", arrow::int32()),
531      arrow::field("b", arrow::boolean()),
532  });
533
534  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
535}
536// (Doc section: Group Aggregate Example)
537
538// (Doc section: ConsumingSink Example)
539/**
540 * \brief
541 * Source-ConsumingSink
542 * This example shows how the data can be consumed within the execution plan
543 * by using a ConsumingSink node. There is no data output from this execution plan.
544 * \param exec_context : execution context
545 * \return arrow::Status
546 */
547arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
548  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
549                        cp::ExecPlan::Make(&exec_context));
550
551  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
552
553  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
554
555  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
556                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
557
558  std::atomic<uint32_t> batches_seen{0};
559  arrow::Future<> finish = arrow::Future<>::Make();
560  struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
561    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
562        : batches_seen(batches_seen), finish(std::move(finish)) {}
563
564    arrow::Status Consume(cp::ExecBatch batch) override {
565      (*batches_seen)++;
566      return arrow::Status::OK();
567    }
568
569    arrow::Future<> Finish() override { return finish; }
570
571    std::atomic<uint32_t>* batches_seen;
572    arrow::Future<> finish;
573  };
574  std::shared_ptr<CustomSinkNodeConsumer> consumer =
575      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
576
577  cp::ExecNode* consuming_sink;
578
579  ARROW_ASSIGN_OR_RAISE(consuming_sink,
580                        MakeExecNode("consuming_sink", plan.get(), {source},
581                                     cp::ConsumingSinkNodeOptions(consumer)));
582
583  ARROW_RETURN_NOT_OK(consuming_sink->Validate());
584
585  ARROW_RETURN_NOT_OK(plan->Validate());
586  std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
587  // plan start producing
588  ARROW_RETURN_NOT_OK(plan->StartProducing());
589  // Source should finish fairly quickly
590  ARROW_RETURN_NOT_OK(source->finished().status());
591  std::cout << "Source Finished!" << std::endl;
592  // Mark consumption complete, plan should finish
593  finish.MarkFinished(arrow::Status::OK());
594  ARROW_RETURN_NOT_OK(plan->finished().status());
595  return arrow::Status::OK();
596}
597// (Doc section: ConsumingSink Example)
598
599// (Doc section: OrderBySink Example)
600
601/**
602 * \brief
603 * Source-OrderBySink
604 * In this example, the data enters through the source node
605 * and the data is ordered in the sink node. The order can be
606 * ASCENDING or DESCENDING and it is configurable. The output
607 * is obtained as a table from the sink node.
608 * \param exec_context : execution context
609 * \return arrow::Status
610 */
611arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
612  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
613                        cp::ExecPlan::Make(&exec_context));
614
615  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
616
617  std::cout << "basic data created" << std::endl;
618
619  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
620
621  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
622  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
623                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
624
625  ARROW_RETURN_NOT_OK(cp::MakeExecNode(
626      "order_by_sink", plan.get(), {source},
627      cp::OrderBySinkNodeOptions{
628          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
629
630  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
631}
632
633// (Doc section: OrderBySink Example)
634
635// (Doc section: HashJoin Example)
636/**
637 * \brief
638 * Source-HashJoin-Sink
639 * This example shows how source node gets the data and how a self-join
640 * is applied on the data. The join options are configurable. The output
641 * is obtained as a table via the sink node.
642 * \param exec_context : execution context
643 * \return arrow::Status
644 */
645arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
646  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
647  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
648                        cp::ExecPlan::Make(&exec_context));
649
650  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
651
652  cp::ExecNode* left_source;
653  cp::ExecNode* right_source;
654  for (auto source : {&left_source, &right_source}) {
655    ARROW_ASSIGN_OR_RAISE(*source,
656                          MakeExecNode("source", plan.get(), {},
657                                       cp::SourceNodeOptions{input.schema, input.gen()}));
658  }
659
660  cp::HashJoinNodeOptions join_opts{
661      cp::JoinType::INNER,
662      /*left_keys=*/{"str"},
663      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
664
665  ARROW_ASSIGN_OR_RAISE(
666      auto hashjoin,
667      cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
668
669  ARROW_RETURN_NOT_OK(
670      cp::MakeExecNode("sink", plan.get(), {hashjoin}, cp::SinkNodeOptions{&sink_gen}));
671  // expected columns i32, str, l_str, r_str
672  auto schema = arrow::schema(
673      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8()),
674       arrow::field("l_str", arrow::utf8()), arrow::field("r_str", arrow::utf8())});
675
676  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
677}
678
679// (Doc section: HashJoin Example)
680
681// (Doc section: KSelect Example)
682/**
683 * \brief
684 * Source-KSelect
685 * This example shows how K number of elements can be selected
686 * either from the top or bottom. The output node is a modified
687 * sink node where output can be obtained as a table.
688 * \param exec_context : execution context
689 * \return arrow::Status
690 */
691arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
692  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
693  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
694                        cp::ExecPlan::Make(&exec_context));
695  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
696
697  ARROW_ASSIGN_OR_RAISE(
698      cp::ExecNode * source,
699      cp::MakeExecNode("source", plan.get(), {},
700                       cp::SourceNodeOptions{input.schema, input.gen()}));
701
702  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
703
704  ARROW_RETURN_NOT_OK(cp::MakeExecNode("select_k_sink", plan.get(), {source},
705                                       cp::SelectKSinkNodeOptions{options, &sink_gen}));
706
707  auto schema = arrow::schema(
708      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
709
710  return ExecutePlanAndCollectAsTable(exec_context, plan, schema, sink_gen);
711}
712
713// (Doc section: KSelect Example)
714
715// (Doc section: Write Example)
716
717/**
718 * \brief
719 * Scan-Filter-Write
720 * This example shows how scan node can be used to load the data
721 * and after processing how it can be written to disk.
722 * \param exec_context : execution context
723 * \param file_path : file saving path
724 * \return arrow::Status
725 */
726arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
727                                     const std::string& file_path) {
728  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
729                        cp::ExecPlan::Make(&exec_context));
730
731  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
732
733  auto options = std::make_shared<arrow::dataset::ScanOptions>();
734  // empty projection
735  options->projection = cp::project({}, {});
736
737  cp::ExecNode* scan;
738
739  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
740
741  ARROW_ASSIGN_OR_RAISE(scan,
742                        cp::MakeExecNode("scan", plan.get(), {}, scan_node_options));
743
744  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
745
746  std::string root_path = "";
747  std::string uri = "file://" + file_path;
748  std::shared_ptr<arrow::fs::FileSystem> filesystem =
749      arrow::fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
750
751  auto base_path = root_path + "/parquet_dataset";
752  // Uncomment the following line, if run repeatedly
753  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
754  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
755
756  // The partition schema determines which fields are part of the partitioning.
757  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
758  // We'll use Hive-style partitioning,
759  // which creates directories with "key=value" pairs.
760
761  auto partitioning =
762      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
763  // We'll write Parquet files.
764  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
765
766  arrow::dataset::FileSystemDatasetWriteOptions write_options;
767  write_options.file_write_options = format->DefaultWriteOptions();
768  write_options.filesystem = filesystem;
769  write_options.base_dir = base_path;
770  write_options.partitioning = partitioning;
771  write_options.basename_template = "part{i}.parquet";
772
773  arrow::dataset::WriteNodeOptions write_node_options{write_options, dataset->schema()};
774
775  ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));
776
777  ARROW_RETURN_NOT_OK(plan->Validate());
778  std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
779  // // // start the ExecPlan
780  ARROW_RETURN_NOT_OK(plan->StartProducing());
781  auto future = plan->finished();
782  ARROW_RETURN_NOT_OK(future.status());
783  future.Wait();
784  return arrow::Status::OK();
785}
786
787// (Doc section: Write Example)
788
789// (Doc section: Union Example)
790
791/**
792 * \brief
793 * Source-Union-Sink
794 * This example shows how a union operation can be applied on two
795 * data sources. The output is obtained as a table via the sink
796 * node.
797 * \param exec_context : execution context
798 * \return arrow::Status
799 */
800arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
801  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
802
803  std::shared_ptr<cp::ExecPlan> plan = cp::ExecPlan::Make(&exec_context).ValueOrDie();
804  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
805
806  cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
807  cp::Declaration lhs{"source",
808                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
809  lhs.label = "lhs";
810  cp::Declaration rhs{"source",
811                      cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
812  rhs.label = "rhs";
813  union_node.inputs.emplace_back(lhs);
814  union_node.inputs.emplace_back(rhs);
815
816  cp::CountOptions options(cp::CountOptions::ONLY_VALID);
817  ARROW_ASSIGN_OR_RAISE(
818      auto declr, cp::Declaration::Sequence({
819                                                union_node,
820                                                {"sink", cp::SinkNodeOptions{&sink_gen}},
821                                            })
822                      .AddToPlan(plan.get()));
823
824  ARROW_RETURN_NOT_OK(declr->Validate());
825
826  ARROW_RETURN_NOT_OK(plan->Validate());
827  return ExecutePlanAndCollectAsTable(exec_context, plan, basic_data.schema, sink_gen);
828}
829
830// (Doc section: Union Example)
831
832enum ExampleMode {
833  SOURCE_SINK = 0,
834  SCAN = 1,
835  FILTER = 2,
836  PROJECT = 3,
837  SCALAR_AGGREGATION = 4,
838  GROUP_AGGREGATION = 5,
839  CONSUMING_SINK = 6,
840  ORDER_BY_SINK = 7,
841  HASHJOIN = 8,
842  KSELECT = 9,
843  WRITE = 10,
844  UNION = 11,
845};
846
847int main(int argc, char** argv) {
848  if (argc < 2) {
849    // Fake success for CI purposes.
850    return EXIT_SUCCESS;
851  }
852
853  std::string base_save_path = argv[1];
854  int mode = std::atoi(argv[2]);
855  arrow::Status status;
856  // ensure arrow::dataset node factories are in the registry
857  arrow::dataset::internal::Initialize();
858  // execution context
859  cp::ExecContext exec_context(arrow::default_memory_pool(),
860                               ::arrow::internal::GetCpuThreadPool());
861  switch (mode) {
862    case SOURCE_SINK:
863      PrintBlock("Source Sink Example");
864      status = SourceSinkExample(exec_context);
865      break;
866    case SCAN:
867      PrintBlock("Scan Example");
868      status = ScanSinkExample(exec_context);
869      break;
870    case FILTER:
871      PrintBlock("Filter Example");
872      status = ScanFilterSinkExample(exec_context);
873      break;
874    case PROJECT:
875      PrintBlock("Project Example");
876      status = ScanProjectSinkExample(exec_context);
877      break;
878    case GROUP_AGGREGATION:
879      PrintBlock("Aggregate Example");
880      status = SourceGroupAggregateSinkExample(exec_context);
881      break;
882    case SCALAR_AGGREGATION:
883      PrintBlock("Aggregate Example");
884      status = SourceScalarAggregateSinkExample(exec_context);
885      break;
886    case CONSUMING_SINK:
887      PrintBlock("Consuming-Sink Example");
888      status = SourceConsumingSinkExample(exec_context);
889      break;
890    case ORDER_BY_SINK:
891      PrintBlock("OrderBy Example");
892      status = SourceOrderBySinkExample(exec_context);
893      break;
894    case HASHJOIN:
895      PrintBlock("HashJoin Example");
896      status = SourceHashJoinSinkExample(exec_context);
897      break;
898    case KSELECT:
899      PrintBlock("KSelect Example");
900      status = SourceKSelectExample(exec_context);
901      break;
902    case WRITE:
903      PrintBlock("Write Example");
904      status = ScanFilterWriteExample(exec_context, base_save_path);
905      break;
906    case UNION:
907      PrintBlock("Union Example");
908      status = SourceUnionSinkExample(exec_context);
909      break;
910    default:
911      break;
912  }
913
914  if (status.ok()) {
915    return EXIT_SUCCESS;
916  } else {
917    std::cout << "Error occurred: " << status.message() << std::endl;
918    return EXIT_FAILURE;
919  }
920}