Contents

Accessing Secret Manager from Dataflow Pipeline

To increase the security of your Dataflow pipeline, store your sensitive configuration in Secret Manager. This post shows how you can retrieve a database password stored in Secret Manager from your Dataflow pipeline.

_____

 

Intro

Dataflow is Google Cloud’s serverless service for executing data pipelines using unified batch and stream data processing SDK based on Apache Beam. It enables developers to process a large amount of data without them having to worry about infrastructure, and it can handle auto scaling in real-time. I have been using Dataflow for some time, and I love its simplicity for running my data pipeline on the Cloud.

 

Secret Manager

One of the common use cases for Dataflow is to read data from a source and write it to a sink. These sources and sinks usually contain valuable data, and we often protect them by using secrets or passwords. As of today, Dataflow does not provide native support for storing and accessing secrets. To secure those secrets, the common approach is to use Cloud KMS to encrypt the secret and decrypt it when running the data pipeline. With the newly launched Secret Manager, we can now store those secrets in Secret Manager and access them from our pipeline to provide better security and ease of use. Below, I show an example of how I access JDBC URL stored in Secret Manager from my Dataflow pipeline. You can find the full source code in my “dataflow-secret-manager” GitHub repository.

 

Database Secret

/images/2020/20200516-dataflow-secret-manager/dataflow-secret-manager-architecture.png
Architecture Diagram

In this example, I am using Dataflow pipeline to read data from an MS-SQL database hosted in Cloud SQL. I store the JDBC URL to connect to the database in the Secret Manager. If you are not familiar with JDBC URL, it is a string that the JDBC driver uses to connect to a database that contains information such as the database IP or host name, the database name to connect to, the username and password to authenticate, and other configuration properties. In this example, I use the JDBC URL format below to connect to the MS-SQL database.

jdbc:sqlserver://$CLOUD_SQL_PRIVATE_IP;databaseName=wide-world-importers;user=sqlserver;password=$DB_PASSWORD

As you can see, the JDBC URL contains the database password that we need to secure. As a good secure coding practice, we should not store the password as part of our source code, and hence I create a secret in Secret Manager to store the JDBC URL securely.

1
2
3
echo "jdbc:sqlserver://$CLOUD_SQL_PRIVATE_IP;databaseName=wide-world-importers;user=sqlserver;password=$DB_PASSWORD" \
        | gcloud secrets create jdbc-url --locations=$SECRET_REGION \
            --replication-policy=user-managed --data-file=-

In this example, I am deploying the Dataflow pipeline code as a Dataflow template. I then create a new Dataflow job from the template to start my data pipeline. During the initialization process, the pipeline will access the secret from Secret Manager to retrieve the JDBC URL. In order for the pipeline to access the secret, the Dataflow worker service account needs to have the “Secret Accessor” IAM role.

1
2
3
4
gcloud secrets add-iam-policy-binding jdbc-url \
        --member="serviceAccount:$DATAFLOW_WORKER_SA" \
        --role='roles/secretmanager.secretAccessor'

 

Dataflow

You can refer to the jdbcUrlTranslator method shown below to find out how the pipeline can read the secret from Secret Manager. It uses Secret Manager SDK to access the secret given a JDBC URL secret name. This method will be called by the NestedValueProvider that is passed into the JdbcIO.DatasourceConfiguration factory method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private static String jdbcUrlTranslator(String jdbcUrlSecretName) {
  try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
    AccessSecretVersionResponse response = client.accessSecretVersion(jdbcUrlSecretName);

    return response.getPayload().getData().toStringUtf8();
  } catch (IOException e) {
    throw new RuntimeException("Unable to read JDBC URL secret");
  }
}

public static void main(String[] args) {
  PipelineOptionsFactory.register(MainPipelineOptions.class);

  MainPipelineOptions options =
      PipelineOptionsFactory.fromArgs(args)
          .withValidation()
          .as(MainPipelineOptions.class);

  NestedValueProvider<String, String> jdbcUrlValueProvider =
      NestedValueProvider.of(
          options.getJdbcUrlSecretName(), MainPipeline::jdbcUrlTranslator);

  Pipeline pipeline = Pipeline.create(options);

  pipeline
      .apply("SQL Server - Read Sales.Customers_Archive",
          JdbcIO.<KV<Integer, String>>read()
              .withDataSourceConfiguration(
                  JdbcIO.DataSourceConfiguration.create(
                      StaticValueProvider.of("com.microsoft.sqlserver.jdbc.SQLServerDriver"),
                      jdbcUrlValueProvider)
              );
  
  // Other transforms

  pipeline.run();
}

 

Try it!

Open in Cloud Shell

If you want to understand further how the example works, you can try running the same pipeline on your GCP project by clicking the Open in Google Cloud Shell button above (which you can also do from the GitHub repository), and then follow the walkthrough tutorial to guide you along the way.

Please let me know how it goes for you and share any of your learning and interesting experiences!

_____

 

Like what you read?
Did you find this article helpful, or save you time? You can thank me with a cup of coffee — thank you!
Buy Me a Coffee at ko-fi.com