Getting your Trinity Audio player ready...
|
Recently, I got a chance to do an R&D on a requirement where I would need to read files stored in a Cloud Storage bucket, which would be processed and transformed in the desired format and stored in an in-memory data store, i.e., Memorystore for faster access. Well, honestly, it took several days to figure out the correct approach before finding the correct technologies to implement this.
One of the best services in Google Cloud Platform that I have worked and experimented with is Cloud Dataflow which is a fully-managed service to execute pipelines within the Google Cloud Platform ecosystem. It is a service which is fully dedicated to transforming and enriching data in stream (real time) and batch (historical) modes. It takes a serverless approach where users can focus on programming instead of managing server clusters, can be integrated with Operations (formerly Stackdriver), which lets you monitor and troubleshoot pipelines as they are running.
Memorystore is Google’s implementation of Redis data store with reduced latency but high scalability. Well, caching is a technique used to accelerate application response times and help applications scale by placing frequently needed data very close to the application. Memorystore for Redis provides a fully-managed service that is powered by the Redis in-memory data store to build application caches that provide sub-millisecond data access.
Prerequisites
Before creating our dataflow pipeline for the implementation, we would require to do 3 things:
- Create two GCS buckets
GCS buckets are required for storing the input file(s) which will be read, transformed and then stored in the Redis data store and the other bucket is required for staging the dataflow pipeline code.
If you are not familiar with the creation of buckets, refer to this GCS documentation.
- Create a Redis Instance
Memorystore (for Redis) instance is required for our implementation to store the processed data after the cloud dataflow pipeline is executed.
The IP Address (Redis Host) is required to be provided in the command-line for executing the dataflow pipeline. If you are not familiar with the creation of memorystore, refer to this Memorystore documentation.
- Upload the input file in the GCS bucket
For the dataflow pipeline to be executed, an input file is needed to be uploaded in the GCS bucket for input, for our case, its cloud-dataflow-input-bucket
for our case.
The input file would have the data with “pipe” separator and is of the form: guid|firstname|lastname|dob|postalcode
The input file can be accessed from here.
What the dataflow pipeline will do?
The idea is that the input file will be read, transformed and stored into a running redis data-store instance.
The transformation step of the pipeline will split the data from the input file and then store it with corresponding keys in the data-store along with the guid.
For example, if the input file is xxxxxx|bruce|wayne|31051989|4444
where xxxxxx
is the guid, bruce
is the firstname, wayne
is the lastname, 31051989
is the dob (in DDMMYYYY format) and 4444
is the postalcode.
The pipeline will store the transformed data in the Redis instance like below:
firstname:bruce xxxxxx
lastname:wayne xxxxxx
dob:31051989 xxxxxx
postalcode:4444 xxxxxx
Creating the dataflow pipeline
We would create a template from the scratch and obviously, we referred and understood the core concepts from this documentation. We would be creating a dataflow batch job and for that, we would have to use Dataflow SDK 2.x and Apache Beam SDK for Redis.
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-redis</artifactId>
<version>2.23.0</version>
</dependency>
For the pipeline code, we would have to construct StorageToRedisOptions (or give any name) object using the method PipelineOptionsFactory.fromArgs to read options from command-line.
Main Class
public static void main(String[] args) {
/**
* Constructed StorageToRedisOptions object using the method PipelineOptionsFactory.fromArgs to read options from command-line
*/
StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StorageToRedisOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()))
.apply("Transforming data...",
ParDo.of(new DoFn<String, String[]>() {
@ProcessElement
public void TransformData(@Element String line, OutputReceiver<String[]> out) {
String[] fields = line.split("\\|");
out.output(fields);
}
}))
.apply("Processing data...",
ParDo.of(new DoFn<String[], KV<String, String>>() {
@ProcessElement
public void ProcessData(@Element String[] fields, OutputReceiver<KV<String, String>> out) {
if (fields[RedisIndex.GUID.getValue()] != null) {
out.output(KV.of("firstname:"
.concat(fields[RedisIndex.FIRSTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("lastname:"
.concat(fields[RedisIndex.LASTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("dob:"
.concat(fields[RedisIndex.DOB.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("postalcode:"
.concat(fields[RedisIndex.POSTAL_CODE.getValue()]), fields[RedisIndex.GUID.getValue()]));
}
}
}))
.apply("Writing field indexes into redis",
RedisIO.write().withMethod(RedisIO.Write.Method.SADD)
.withEndpoint(options.getRedisHost(), options.getRedisPort()));
p.run();
}
You can clone the complete code from this GitHub repository. You can also refer to this documentation for designing your pipeline.
Executing the dataflow pipeline
We would have to execute the below command to create the dataflow template.
mvn compile exec:java \
-Dexec.mainClass=com.viveknaskar.DataFlowPipelineForMemStore \
-Dexec.args="--project=your-project-id \
--jobName=dataflow-memstore-job \
--inputFile=gs://cloud-dataflow-input-bucket/*.txt \
--redisHost=127.0.0.1 \
--stagingLocation=gs://dataflow-pipeline-batch-bucket/staging/ \
--dataflowJobFile=gs://dataflow-pipeline-batch-bucket/templates/dataflow-template \
--gcpTempLocation=gs://dataflow-pipeline-batch-bucket/tmp/ \
--runner=DataflowRunner"
Here,
project: name-of-the-project-where-dataflow-pipeline-job-is-created,
jobName: name-of-the-dataflow-pipeline,
inputFile: bucket-where-the-input-file-is-read-by-the-pipeline,
redisHost: ip-address-of-the-running-redis-instance,
dataflowJobFile: bucket-where-the-dataflow-template-is-created,
runner: DataflowRunner (for running dataflow pipeline),
stagingLocation, tempLocation also needs to be provided.
Once build is successful, the dataflow template would be created and a dataflow job would run.
The dataflow job is also represented in a graph summarizing about various stages of the pipeline. You can also check the logs.
Check the data inserted in Memorystore instance
For checking whether the processed data is stored in the Redis instance after the dataflow pipeline is executed successfully, you must first connect to the Redis instance from any Compute Engine VM instance located within the same project, region and network as the Redis instance.
- Create a VM instance and SSH to it
- Install telnet from apt-get in the VM instance
sudo apt-get install telnet
- From the VM instance, connect to the ip-address of the Redis instance
telnet instance-ip-address 6379
- Once you are in the redis, check the keys inserted
keys *
- Check whether the data is inserted using the intersection command to get the guid
sinter firstname:<firstname> lastname:<lastname> dob:<dob> postalcode:<post-code>
- Check with individual entry using the below command to get the guid
smembers firstname:<firstname>
- Command to clear the Redis data store
flushall
You can read more about Redis commands in this documentation.
Finally, we have achieved what we wanted…
Dataflow pipeline jobs are champions when it comes to processing our bulk data within seconds. I highly recommend you to do it yourself and see how fast it is. Well, I have tried to attach as many resources as possible and if you go through the code, it is fairly simple. Still, you will get the gist of it when you experiment on your own. 🙂
If this article provided you with value, please support my work — only if you can afford it. You can also connect with me on X. Thank you!
Comments are closed.