Перейти к основному содержимому
Перейти к основному содержимому

Spark JDBC

JDBC является одним из самых часто используемых источников данных в Spark. В этом разделе мы предоставим информацию о том, как использовать официальный JDBC-коннектор ClickHouse с Spark.

Чтение данных

public static void main(String[] args) {
        // Инициализация сессии Spark
        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

        String jdbcURL = "jdbc:ch://localhost:8123/default";
        String query = "select * from example_table where id > 2";


        //---------------------------------------------------------------------------------------------------
        // Загрузка таблицы из ClickHouse с использованием метода jdbc
        //---------------------------------------------------------------------------------------------------
        Properties jdbcProperties = new Properties();
        jdbcProperties.put("user", "default");
        jdbcProperties.put("password", "123456");

        Dataset<Row> df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties);

        df1.show();

        //---------------------------------------------------------------------------------------------------
        // Загрузка таблицы из ClickHouse с использованием метода load
        //---------------------------------------------------------------------------------------------------
        Dataset<Row> df2 = spark.read()
                .format("jdbc")
                .option("url", jdbcURL)
                .option("user", "default")
                .option("password", "123456")
                .option("query", query)
                .load();


        df2.show();


        // Остановка сессии Spark
        spark.stop();
    }

Запись данных

 public static void main(String[] args) {
        // Инициализация сессии Spark
        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

        // Подробности соединения JDBC
        String jdbcUrl = "jdbc:ch://localhost:8123/default";
        Properties jdbcProperties = new Properties();
        jdbcProperties.put("user", "default");
        jdbcProperties.put("password", "123456");

        // Создание примера DataFrame
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false)
        });

        List<Row> rows = new ArrayList<Row>();
        rows.add(RowFactory.create(1, "John"));
        rows.add(RowFactory.create(2, "Doe"));


        Dataset<Row> df = spark.createDataFrame(rows, schema);

        //---------------------------------------------------------------------------------------------------
        // Запись df в ClickHouse с использованием метода jdbc
        //---------------------------------------------------------------------------------------------------

        df.write()
                .mode(SaveMode.Append)
                .jdbc(jdbcUrl, "example_table", jdbcProperties);

        //---------------------------------------------------------------------------------------------------
        // Запись df в ClickHouse с использованием метода save
        //---------------------------------------------------------------------------------------------------

        df.write()
                .format("jdbc")
                .mode("append")
                .option("url", jdbcUrl)
                .option("dbtable", "example_table")
                .option("user", "default")
                .option("password", "123456")
                .save();


        // Остановка сессии Spark
        spark.stop();
    }

Параллелизм

При использовании Spark JDBC Spark считывает данные, используя одну партицию. Для достижения более высокой параллельности необходимо указать partitionColumn, lowerBound, upperBound и numPartitions, которые описывают, как партиционировать таблицу при чтении параллельно из нескольких рабочих узлов. Пожалуйста, посетите официальную документацию Apache Spark для получения дополнительной информации о конфигурациях JDBC.

Ограничения JDBC

  • На сегодняшний день вы можете вставлять данные через JDBC только в существующие таблицы (в настоящее время нет способа автоматически создавать таблицу при вставке DF, как это делает Spark с другими коннекторами).