Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: SparkSQL
    • Labels:
      None

      Description

      Spark SQL Data Sources API on this link https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html

      Summary of the changes

      This ticket implements Cassandra data source, and update CassandraCatalog to use it. It also creates a metastore to store meta data of tables from different data sources.

      1. Cassandra data source

      create a temp table as

          sqlContext.sql(
            s"""
              |CREATE TEMPORARY TABLE tmpTable
              |USING org.apache.spark.sql.cassandra
              |OPTIONS (
              | c_table "table",
              | keyspace "keyspace",
              | cluster "cluster",
              | push_down "true",
              | spark_cassandra_input_page_row_size "10",
              | spark_cassandra_output_consistency_level "ONE",
              | spark_cassandra_connection_timeout_ms "1000"
              | )
            """.stripMargin.replaceAll("\n", " "))
      
      

      drop a temp table as

      sqlContext.dropTempTable("tmpTable")
      

      save table to another table as

          sqlContext.sql("SELECT a, b from ddlTable").save("org.apache.spark.sql.cassandra",
            ErrorIfExists, Map("c_table" -> "test_insert1", "keyspace" -> "sql_test"))
      

      create datasource relation

      
        override def createRelation(
          sqlContext: SQLContext,
          parameters: Map[String, String]): BaseRelation
      
      
        override def createRelation(
          sqlContext: SQLContext,
          parameters: Map[String, String],
          schema: StructType): BaseRelation
      
      
        override def createRelation(
          sqlContext: SQLContext,
          mode: SaveMode,
          parameters: Map[String, String],
          data: DataFrame): BaseRelation
      
         def apply(
            tableRef: TableRef,
            schema : Option[StructType] = None,
            sourceOptions: CassandraSourceOptions = CassandraSourceOptions())(
          implicit sqlContext: SQLContext) : CassandraSourceRelation
      

        Attachments

          Issue links

            Activity

              People

              • Assignee:
                alexliu Alex Liu
                Reporter:
                alexliu Alex Liu
                Reviewer:
                Russell Spitzer
                Reviewer 2:
                Piotr Kołaczkowski
              • Votes:
                0 Vote for this issue
                Watchers:
                9 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: