Integrating Spark SQL Data Sources API
Description
Pull Requests
relates to
Activity
@Catalin Zamfir I'm sorry you are disappointed by our release cycle, but I wanted to remind you this is an open-source project and everyone is encouraged to help. If you find some feature missing, you are free to contribute, either by writing code, documentation or doing reviews.
Yeah, it may have bugs, code may be ugly...
We keep pretty high standards and we are not going to release anything that we know is broken, could cause trouble to production users or would require us to revert in the next version. While shipping code fast with lower quality may sound good as a short-term strategy, this would hurt project in the long run. If you want to use cutting-edge, unstable code, you can still compile from source or even use things from topic branches.
BTW, the master branch worked with Spark 1.3.0 within a week or two since Spark 1.3.0 release, so this was not 2 months as you say, but I get the message we need to make the gap smaller and we'll work harder to improve.
It's now in master branch. Other related tickets are on their way.
All this while the community waits for a release. We've ended up coding our own, using the java driver and spark's .parallelize to pass the arguments of query to all nodes. Beats waiting on the connector 🙂 ... Spark 1.3.0 was released March 13 and it's 20/05/2015 without a clear deadline in sight.
Personally I'm just tired almost daily watching for updates on these two tickets (just to avoid going non-standard). Took the decision to write our own about a month ago when no reaction came from here 🙂 ... In fact, we've removed the connector entirely and rely on custom code.
I do hope in your project organization someone takes the time to do the full review and unblock these tickets for the sake of opportunity. It's a beer on me! Yeah, it may have bugs, code may be ugly but there's a window of opportunity the community expects especially from momentum technologies like Spark. 2 months after is not opportunity and most have custom-coded their own solutions for Spark 1.3.x/Cassandra 2.x connections.
🙂
After offline discussion, we make some decision here. Split the work into the following tickets.
1. This ticket only implements basic databsource API
2. SPARKC-162, Add keyspace/cluster level settings support
3. SPARKC-163, Replace CassandraRelation by CassandraResourceRelation for CassandraCatalog
4. SPARKC-135, Create a metastore to store metadata
5. SPARKC-140, Add custom DDL parser
6. SPARKC-137, Add table Creation command
7. SPARKC-129, Add custom data types
Spark SQL Data Sources API on this link https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html
Summary of the changes
This ticket implements Cassandra data source, and update CassandraCatalog to use it. It also creates a metastore to store meta data of tables from different data sources.
1. Cassandra data source
create a temp table as
sqlContext.sql( s""" |CREATE TEMPORARY TABLE tmpTable |USING org.apache.spark.sql.cassandra |OPTIONS ( | c_table "table", | keyspace "keyspace", | cluster "cluster", | push_down "true", | spark_cassandra_input_page_row_size "10", | spark_cassandra_output_consistency_level "ONE", | spark_cassandra_connection_timeout_ms "1000" | ) """.stripMargin.replaceAll("\n", " "))
drop a temp table as
sqlContext.dropTempTable("tmpTable")
save table to another table as
sqlContext.sql("SELECT a, b from ddlTable").save("org.apache.spark.sql.cassandra", ErrorIfExists, Map("c_table" -> "test_insert1", "keyspace" -> "sql_test"))
create datasource relation
override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation override def createRelation( sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation override def createRelation( sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation def apply( tableRef: TableRef, schema : Option[StructType] = None, sourceOptions: CassandraSourceOptions = CassandraSourceOptions())( implicit sqlContext: SQLContext) : CassandraSourceRelation