Skip to content

Commit

Permalink
Add query runner context and kRange frames
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodsatya committed Jun 25, 2024
1 parent 28238cd commit 93ddd89
Show file tree
Hide file tree
Showing 11 changed files with 475 additions and 207 deletions.
14 changes: 7 additions & 7 deletions velox/exec/WindowPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,13 @@ void WindowPartition::updateKRangeFrameBounds(
// The user is expected to set the frame column equal to NULL when the
// ORDER BY value is NULL and not in any other case. Validate this
// assumption.
VELOX_DCHECK_EQ(
RowContainer::isNullAt(
partitionRow, frameRowColumn.nullByte(), frameRowColumn.nullMask()),
RowContainer::isNullAt(
partitionRow,
orderByRowColumn.nullByte(),
orderByRowColumn.nullMask()));
// auto isFrameColNull = RowContainer::isNullAt(
// partitionRow, frameRowColumn.nullByte(), frameRowColumn.nullMask());
// auto isOrderByColNull = RowContainer::isNullAt(
// partitionRow, orderByRowColumn.nullByte(), orderByRowColumn.nullMask());
// if (isFrameColNull != isOrderByColNull) {
// VELOX_FAIL("Null values do not match at row {}", i);
// }

// If the frame is NULL or 0 preceding or 0 following then the current row
// has same values for order by and frame column. In that case
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/fuzzer/AggregationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ void AggregationFuzzer::go() {

auto partitionKeys = generateKeys("p", argNames, argTypes);
auto sortingKeys = generateSortingKeys("s", argNames, argTypes);
auto input = generateInputDataWithRowNumber(
auto input = generateInputDataForWindowFuzzer(
argNames, argTypes, partitionKeys, signature);

logVectors(input);
Expand Down
56 changes: 45 additions & 11 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ DEFINE_int32(
100,
"The number of elements on each generated vector.");

DEFINE_int32(num_batches, 10, "The number of generated vectors.");
DEFINE_int32(num_batches, 1, "The number of generated vectors.");

DEFINE_int32(
max_num_varargs,
Expand Down Expand Up @@ -236,14 +236,41 @@ std::vector<std::string> AggregationFuzzerBase::generateKeys(
std::vector<std::string> AggregationFuzzerBase::generateSortingKeys(
const std::string& prefix,
std::vector<std::string>& names,
std::vector<TypePtr>& types) {
std::vector<TypePtr>& types,
const bool hasRowNumberKey,
const bool isKRangeFrame) {
std::vector<std::string> keys;
auto numKeys = boost::random::uniform_int_distribution<uint32_t>(1, 5)(rng_);
vector_size_t numKeys;
vector_size_t maxDepth;
std::vector<TypePtr> sortingKeyTypes = kScalarTypes;

// If frame has kRange bound, only one sorting key should be present. If the
// row_number column is not present, generate this sorting key randomly; use
// the row_number column as sorting key otherwise.
if (isKRangeFrame) {
if (hasRowNumberKey) {
return keys;
} else {
numKeys = 1;
// Pick scalar type which supports '+', '-' binary operations.
sortingKeyTypes = {
TINYINT(),
SMALLINT(),
INTEGER(),
BIGINT(),
HUGEINT(),
REAL(),
DOUBLE()};
maxDepth = 0;
}
} else {
numKeys = randInt(1, 5);
maxDepth = 2;
}

for (auto i = 0; i < numKeys; ++i) {
keys.push_back(fmt::format("{}{}", prefix, i));

// Pick random, possibly complex, type.
types.push_back(vectorFuzzer_.randOrderableType(2));
types.push_back(vectorFuzzer_.randOrderableType(maxDepth, sortingKeyTypes));
names.push_back(keys.back());
}

Expand Down Expand Up @@ -296,13 +323,17 @@ std::vector<RowVectorPtr> AggregationFuzzerBase::generateInputData(
return input;
}

std::vector<RowVectorPtr> AggregationFuzzerBase::generateInputDataWithRowNumber(
std::vector<RowVectorPtr>
AggregationFuzzerBase::generateInputDataForWindowFuzzer(
std::vector<std::string> names,
std::vector<TypePtr> types,
const std::vector<std::string>& partitionKeys,
const CallableSignature& signature) {
names.push_back("row_number");
types.push_back(BIGINT());
const CallableSignature& signature,
const bool hasRowNumberKey) {
if (hasRowNumberKey) {
names.push_back("row_number");
types.push_back(BIGINT());
}

auto generator = findInputGenerator(signature);

Expand Down Expand Up @@ -794,11 +825,14 @@ std::unique_ptr<ReferenceQueryRunner> setupReferenceQueryRunner(
LOG(INFO) << "Using DuckDB as the reference DB.";
return duckQueryRunner;
} else {
return std::make_unique<PrestoQueryRunner>(
auto prestoQueryRunner = std::make_unique<PrestoQueryRunner>(
prestoUrl,
runnerName,
static_cast<std::chrono::milliseconds>(reqTimeoutMs));
prestoQueryRunner->queryRunnerContext_ =
std::make_shared<QueryRunnerContext>();
LOG(INFO) << "Using Presto as the reference DB.";
return prestoQueryRunner;
}
}

Expand Down
9 changes: 6 additions & 3 deletions velox/exec/fuzzer/AggregationFuzzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ class AggregationFuzzerBase {
std::vector<std::string> generateSortingKeys(
const std::string& prefix,
std::vector<std::string>& names,
std::vector<TypePtr>& types);
std::vector<TypePtr>& types,
const bool hasRowNumberKey = true,
const bool isKRangeFrame = false);

std::pair<CallableSignature, SignatureStats&> pickSignature();

Expand All @@ -202,11 +204,12 @@ class AggregationFuzzerBase {
// child named "row_number" of BIGINT row numbers that differentiates every
// row. Row numbers start from 0. This additional input vector is needed for
// result verification of window aggregations.
std::vector<RowVectorPtr> generateInputDataWithRowNumber(
std::vector<RowVectorPtr> generateInputDataForWindowFuzzer(
std::vector<std::string> names,
std::vector<TypePtr> types,
const std::vector<std::string>& partitionKeys,
const CallableSignature& signature);
const CallableSignature& signature,
const bool hasRowNumberKey = true);

std::pair<std::optional<MaterializedRowMultiset>, ReferenceQueryErrorCode>
computeReferenceResults(
Expand Down
54 changes: 3 additions & 51 deletions velox/exec/fuzzer/PrestoQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,56 +431,6 @@ std::optional<std::string> PrestoQueryRunner::toSql(
return sql.str();
}

namespace {

void appendWindowFrame(
const core::WindowNode::Frame& frame,
std::stringstream& sql) {
// TODO: Add support for k Range Frames by retrieving the original range bound
// from WindowNode.
switch (frame.type) {
case core::WindowNode::WindowType::kRange:
sql << " RANGE";
break;
case core::WindowNode::WindowType::kRows:
sql << " ROWS";
break;
default:
VELOX_UNREACHABLE();
}
sql << " BETWEEN";

auto appendBound = [&sql](
const core::WindowNode::BoundType& bound,
const core::TypedExprPtr& value) {
switch (bound) {
case core::WindowNode::BoundType::kUnboundedPreceding:
sql << " UNBOUNDED PRECEDING";
break;
case core::WindowNode::BoundType::kUnboundedFollowing:
sql << " UNBOUNDED FOLLOWING";
break;
case core::WindowNode::BoundType::kCurrentRow:
sql << " CURRENT ROW";
break;
case core::WindowNode::BoundType::kPreceding:
sql << " " << value->toString() << " PRECEDING";
break;
case core::WindowNode::BoundType::kFollowing:
sql << " " << value->toString() << " FOLLOWING";
break;
default:
VELOX_UNREACHABLE();
}
};

appendBound(frame.startType, frame.startValue);
sql << " AND";
appendBound(frame.endType, frame.endValue);
}

} // namespace

std::optional<std::string> PrestoQueryRunner::toSql(
const std::shared_ptr<const core::WindowNode>& windowNode) {
if (!isSupportedDwrfType(windowNode->sources()[0]->outputType())) {
Expand Down Expand Up @@ -525,7 +475,9 @@ std::optional<std::string> PrestoQueryRunner::toSql(
}
}

appendWindowFrame(functions[i].frame, sql);
auto frameClause =
queryRunnerContext_->windowFrames_.at(windowNode->id()).back();
sql << frameClause;
sql << ")";
}

Expand Down
8 changes: 7 additions & 1 deletion velox/exec/fuzzer/ReferenceQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

namespace facebook::velox::exec::test {

class QueryRunnerContext {
public:
std::unordered_map<core::PlanNodeId, std::vector<std::string>> windowFrames_;
};

/// Query runner that uses reference database, i.e. DuckDB, Presto, Spark.
class ReferenceQueryRunner {
public:
Expand Down Expand Up @@ -78,6 +83,7 @@ class ReferenceQueryRunner {
const std::string& sessionProperty) {
VELOX_UNSUPPORTED();
}
};

std::shared_ptr<QueryRunnerContext> queryRunnerContext_;
};
} // namespace facebook::velox::exec::test
Loading

0 comments on commit 93ddd89

Please sign in to comment.