Skip to:
Provides an interface for users to submit their own pushdown rules. This will be useful for those who want to modify the basic rules provided by the connector based on their integrations with other technologies like Solr etc ...
We should allow a parameter like
spark.cassandra.connector.sql.pushdown.additionalclasses="class,class,class"
Where these classes implement an interface
case class AnalyzedPredicates[Predicate : PredicateOps]( handledByCassandra: Set[Predicate], handledBySpark: Set[Predicate] ) trait CassandraPredicateRules{ def applyRules[T](predicates: AnalyzedPredicates[T], tableDef: TableDef): AnalyzedPredicates[T] }
Then when we determine our filters we just do
private[connector] def predicatePushDown(filters: Array[Filter]) = { /** Apply built in rules **/ val bcpp = new BasicCassandraPredicatePushDown(filters.toSet, tableDef) val basicPushdown = AnalyzedPredicates(bcpp.predicatesToPushDown, bcpp.predicatesToPreserve) logInfo(s"Basic Rules Applied: $basicPushdown") /** Apply any user defined rules **/ val additionalRules: List[CassandraPredicateRules] = List.empty additionalRules.foldRight(basicPushdown)( (rules, pushdowns) => rules.applyRules(pushdowns, tableDef) ) }
Followed all your comments and merged
@Russell Spitzer I reviewed, just few minor comments, go ahead with this ticket once you take a look.
Provides an interface for users to submit their own pushdown rules. This will be useful for those who want to modify the basic rules provided by the connector based on their integrations with other technologies like Solr etc ...
We should allow a parameter like
spark.cassandra.connector.sql.pushdown.additionalclasses="class,class,class"
Where these classes implement an interface
case class AnalyzedPredicates[Predicate : PredicateOps]( handledByCassandra: Set[Predicate], handledBySpark: Set[Predicate] ) trait CassandraPredicateRules{ def applyRules[T](predicates: AnalyzedPredicates[T], tableDef: TableDef): AnalyzedPredicates[T] }
Then when we determine our filters we just do
private[connector] def predicatePushDown(filters: Array[Filter]) = { /** Apply built in rules **/ val bcpp = new BasicCassandraPredicatePushDown(filters.toSet, tableDef) val basicPushdown = AnalyzedPredicates(bcpp.predicatesToPushDown, bcpp.predicatesToPreserve) logInfo(s"Basic Rules Applied: $basicPushdown") /** Apply any user defined rules **/ val additionalRules: List[CassandraPredicateRules] = List.empty additionalRules.foldRight(basicPushdown)( (rules, pushdowns) => rules.applyRules(pushdowns, tableDef) ) }