Striim Team

223 Posts

Building a Multi-Cloud Data Fabric for Real-Time Analytics

DataFabric_LinkedIn_Rect_Final_Video_Thumbnail-1

Data is increasingly siloed, making it harder for companies to extract the most value from their data. This is compounded by the fact that over 90% of companies plan on having hybrid cloud and or multi-cloud operations by 2022.

Watch this on-demand webinar with James Serra (Data Platform Architecture Lead at EY) where we demystify building a multi-cloud data environment for operations and real-time analytics. We cover the following topics:

  • Pros and cons of multi-cloud vs doubling down on a single cloud
  • Enterprise data patterns such as Data Fabric, Data Mesh, and The Modern Data Stack
  • Data ingestion and data transformation in a multi-cloud/hybrid cloud environment
  • Comparison of data warehouses (Snowflake, Synapse, Redshift, BigQuery) for real-time workloads

Oracle Change Data Capture – An Event-Driven Architecture for Cloud Adoption

Tutorial

Oracle Change Data Capture – An Event-Driven Architecture for Cloud Adoption

How to replace batch ETL by event-driven distributed stream processing

Benefits

Operational Analytics 
Use non-intrusive CDC to Kafka to create persistent streams that can be accessed by multiple consumers and automatically reflect upstream schema changes

Empower Your TeamsGive teams across your organization a real-time view of your Oracle database transactions.Get Analytics-Ready DataGet your data ready for analytics before it lands in the cloud. Process and analyze in-flight data with scalable streaming SQL.
On this page

Overview

All businesses rely on data. Historically, this data resided in monolithic databases, and batch ETL processes were used to move that data to warehouses and other data stores for reporting and analytics purposes. As businesses modernize, looking to the cloud for analytics, and striving for real-time data insights, they often find that these databases are difficult to completely replace, yet the data and transactions happening within them are essential for analytics. With over 80% of businesses noting that the volume & velocity of their data is rapidly increasing, scalable cloud adoption and change data capture from databases like Oracle, SQLServer, MySQL and others is more critical than ever before. Oracle change data capture is specifically one area where companies are seeing an influx of modern data integration use cases.

To resolve this, more and more companies are moving to event-driven architectures, because of the dynamic distributed scalability which makes sharing large volumes of data across systems possible.

In this post we will look at an example which replaces batch ETL by event-driven distributed stream processing: Oracle change data capture events are extracted as they are created; enriched with in-memory, SQL-based denormalization; then delivered to the Mongodb to provide scalable, real-time, low-cost analytics, without affecting the source database. We will also look at using the enriched events, optionally backed by Kafka, to incrementally add other event-driven applications or services.

multi-usecase

Continuous Data Collection, Processing, Delivery, and Analytics with the Striim Platform

Event-Driven Architecture Patterns

Most business data is produced as a sequence of events, or an event stream: for example, web or mobile app interactions, devices, sensors, bank transactions, all continuously generate events. Even the current state of a database is the outcome of a sequence of events.

Treating state as the result of a sequence of events forms the core of several event-driven patterns.

Event Sourcing is an architectural pattern in which the state of the application is determined by a sequence of events. As an example, imagine that each “event” is an incremental update to an entry in a database. In this case, the state of a particular entry is simply the accumulation of events pertaining to that entry. In the example below the stream contains the queue of all deposit and withdrawal events, and the database table persists the current account balances.

striim data flow

Imagine Each Event as a Change to an Entry in a Database

The events in the stream can be used to reconstruct the current account balances in the database, but not the other way around. Databases can be replicated with a technology called Change Data Capture (CDC), which collects the changes being applied to a source database, as soon as they occur by monitoring its change log, turns them into a stream of events, then applies those changes to a target database. Source code version control is another well known example of this, where the current state of a file is some base version, plus the accumulation of all changes that have been made to it.

striim data flow

The Change Log can be used to Replicate a Database

What if you need to have the same set of data for different databases, for different types of use? With a stream, the same message can be processed by different consumers for different purposes. As shown below, the stream can act as a distribution point, where, following the polygot persistence pattern, events can be delivered to a variety of data stores, each using the most suited technology for a particular use case or materialized view.

striim data flow

Streaming Events Delivered to a Variety of Data Stores

Event-Driven Streaming ETL Use Case Example

Below is a diagram of the Event-Driven Streaming ETL use case example:

cosmos

Event-Driven Streaming ETL Use Case Diagram

  1. Striim’s low-impact, real-time Oracle change data capture (CDC) feature is used to stream database changes (inserts, updates and deletes) from an Operational Oracle database into Striim

  2. CDC Events are enriched and denormalized with Streaming SQL and Cached data, in order to make relevant data available together

  3. Enriched, denormalized events are streamed to CosmosDB for real-time analytics

  4. Enriched streaming events can be monitored in real time with the Striim Web UI, and are available for further Streaming SQL analysis, wizard-based dashboards, and other applications in the cloud. You can use Striim by signing up for free Striim Developer or Striim Cloud trial.

Striim can simultaneously ingest data from other sources like Kafka and log files so all data is streamed with equal consistency. Please follow the instructions below to learn how to build a Oracle CDC to NoSQL MongoDB real-time streaming application:

Step1: Generate Schemas in your Oracle Database

You can find the csv data file in our github repository. Use the following schema to create two empty tables in your source database:

The HOSPITAL_DATA table, containing details about each hospital would be used as a cache to enrich our real-time data stream.

Schema:

				
					CREATE TABLE “<database name>”.“HOSPITAL_DATA”
(“PROVIDER_ID” VARCHAR2(10),
“HOSPITAL_NAME” VARCHAR2(50),
“ADDRESS” VARCHAR2(50),
“CITY” VARCHAR2(50),
“STATE” VARCHAR2(40),
“ZIP_CODE” VARCHAR2(10),
“COUNTY” VARCHAR2(40),
“PHONE_NUMBER” VARCHAR2(15), PRIMARY KEY (“PROVIDER_ID”));
				
			

Insert the data from this csv file to the above table.

The HOSPITAL_COMPLICATIONS_DATA contains details of complications in various hospitals. Ideally the data is streamed in real-time but for our tutorial, we will import csv data for CDC.

Schema:

				
					CREATE TABLE <database name>.HOSPITAL_COMPLICATIONS_DATA (
COMPLICATION_ID NUMBER(10,0) NOT NULL,
PROVIDER_ID VARCHAR2(10) NULL,
MEASURE_NAME VARCHAR2(100) NULL,
MEASURE_ID VARCHAR2(40) NULL,
COMPARED_TO_NATIONAL VARCHAR2(50) NULL,
DENOMINATOR VARCHAR2(20) NULL,
SCORE VARCHAR2(20) NULL,
LOWER_ESTIMATE VARCHAR2(40) NULL,
HIGHER_ESTIMATE VARCHAR2(20) NULL,
FOOTNOTE VARCHAR2(400) NULL,
MEASURE_START_DT DATE NULL,
MEASURE_END_DT DATE NULL,
);
				
			

Step 2: Replacing Batch Extract with Real Time Streaming of CDC Order Events

Striim’s easy-to-use CDC wizards automate the creation of applications that leverage change data capture, to stream events as they are created, from various source systems to various targets. In this example, shown below, we use Striim’s OracleReader (Oracle Change Data Capture) to read the hospital incident data in real-time and stream these insert, update, delete operations, as soon as the transactions commit, into Striim, without impacting the performance of the source database. Configure your source database by entering the hostname, username, password and table names.

Step 3: NULL Value handling

The data contains “Not Available” strings in some of the rows. Striim can manipulate the data in real-time to convert it into Null values using a Continuous Query component. Use the following query to change “Not Available” strings to Null:

				
					SELECT
t
FROM complication_data_stream t
MODIFY
(
data[5] = CASE WHEN TO_STRING(data[5]) == "Not Available" THEN NULL else TO_STRING(data[5]) END,
data[6] = CASE WHEN TO_STRING(data[6]) == "Not Available" THEN NULL else TO_STRING(data[6]) END,
data[7] = CASE WHEN TO_STRING(data[7]) == "Not Available" THEN NULL else TO_STRING(data[7]) END,
data[8] = CASE WHEN TO_STRING(data[8]) == "Not Available" THEN NULL else TO_STRING(data[8]) END
);
				
			

Step 4: Using Continuous Query for Data Processing

The hospital_complications_data has a column COMPARED_TO_NATIONAL that indicates how the particular complication compares to national average. We will process this data to generate a column called ‘SCORE_COMPARISON’ that is in the scale of GOOD, BAD, OUTLIER or NULL and is easier to read. If “Number of Cases too small”, then the SCORE_COMPARISON is “Outlier” and if “worse than national average”, then the SCORE_COMPARISON is BAD otherwise GOOD.

				
					SELECT
CASE WHEN TO_STRING(data[4]) =="Not Available" or TO_STRING(data[4]) =="Number of Cases Too Small"
THEN putUserData(t, 'SCORE_COMPARISON', "OUTLIER")
WHEN TO_STRING(data[4]) =="Worse than the National Rate"
THEN putUserData(t, 'SCORE_COMPARISON', "BAD")
WHEN TO_STRING(data[4]) =="Better than the National Rate" OR TO_STRING(data[4]) =="No Different than the National Rate"
THEN putUserData(t, 'SCORE_COMPARISON', "GOOD")
ELSE putUserData(t, 'SCORE_COMPARISON', NULL)
END
FROM nullified_stream2 t;
				
			

Step 5: Utilizing Caches For Enrichment

Relational Databases typically have a normalized schema which makes storage efficient, but causes joins for queries, and does not scale well horizontally. NoSQL databases typically have a denormalized schema which scales across a cluster because data that is read together is stored together.

With a normalized schema, a lot of the data fields will be in the form of IDs. This is very efficient for the database, but not very useful for downstream queries or analytics without any meaning or context. In this example we want to enrich the raw Orders data with reference data from the SalesRep table, correlated by the Order Sales_Rep_ID, to produce a denormalized record including the Sales Rep Name and Email information in order to make analysis easier by making this data available together.

Since the Striim platform is a high-speed, low latency, SQL-based stream processing platform, reference data also needs to be loaded into memory so that it can be joined with the streaming data without slowing things down. This is achieved through the use of the Cache component. Within the Striim platform, caches are backed by a distributed in-memory data grid that can contain millions of reference items distributed around a Striim cluster. Caches can be loaded from database queries, Hadoop, or files, and maintain data in-memory so that joining with them can be very fast. In this example, shown below, the cache is loaded with a query on the SalesRep table using the Striim DatabaseReader.

First we will define a cache type from the console. Run the following query from your console.

				
					CREATE TYPE  HospitalDataType(
PROVIDER_ID String KEY,
HOSPITAL_NAME String,
ADDRESS String,
CITY String,
STATE String,
ZIP_CODE String,
COUNTY String,
PHONE_NUMBER String
);
				
			

 

Now, drag a DB Cache component from the list of Striim Components on the left and enter your database details. The table will be queried to join with the streaming data.

Query:

				
					SELECT PROVIDER_ID,HOSPITAL_NAME,ADDRESS,CITY,STATE,ZIP_CODE,COUNTY,PHONE_NUMBER FROM QATEST2.HOSPITAL_DATA;
				
			

Step 6: Joining Streaming and Cache Data For Real Time Transforming and Enrichment With SQL

We can process and enrich data-in-motion using continuous queries written in Striim’s SQL-based stream processing language. Using a SQL-based language is intuitive for data processing tasks, and most common SQL constructs can be utilized in a streaming environment. The main differences between using SQL for stream processing, and its more traditional use as a database query language, are that all processing is in-memory, and data is processed continuously, such that every event on an input data stream to a query can result in an output.

This is the query we will use to process and enrich the incoming data stream:

				
					SELECT data[1] as provider_id, data[2] as Measure_Name, data[3] as Measure_id,
t.HOSPITAL_NAME as hosp_name,
t.state as cache_state, 
t.phone_number as cache_phone
FROM score_comparison_stream n, hospital_data_cache t where t.provider_id=TO_STRING(n.data[1]);
				
			

In this query, we enriched our streaming data using cache data about hospital details. The result of this query is to continuously output enriched (denormalized) events, shown below, for every CDC event that occurs for the HOSPITAL_COMPLICATIONS_DATA table. So with this approach we can join streams from an Oracle Change Data Capture reader with cached data for enrichment.

Step 7: Loading the Enriched Data to the Cloud for Real Time Analytics

Now the Oracle CDC (Oracle change data capture) data, streamed and enriched through Striim, can be stored simultaneously in NoSQL Mongodb using Mongodb writer. Enter the connection url for your mongodb Nosql database in the following format and enter your username, password and collections name.

mongodb+srv://&lt;username&gt;:&lt;password&gt;<hostname>/

Step 8: Running the Oracle CDC to Mongodb streaming application

Now that the striim app is configured, you will deploy and run the CDC application. You can also download the TQL file (passphrase: striimrecipes) from our github repository and configure your own source and targets. Import the csv file to HOSPITAL_COMPLICATIONS_DATA table on your source database after the striim app has started running. The Change data capture is streamed through the various components for enrichment and processing. You can click on the ‘eye’ next to each stream component to see the data in real-time.

Using Kafka for Streaming Replay and Application Decoupling

The enriched stream of order events can be backed by or published to Kafka for stream persistence, laying the foundation for streaming replay and application decoupling. Striim’s native integration with Apache Kafka makes it quick and easy to leverage Kafka to make every data source re-playable, enabling recovery even for streaming sources that cannot be rewound. This also acts to decouple applications, enabling multiple applications to be powered by the same data source, and for new applications, caches or views to be added later.

Streaming SQL for Aggregates

We can further use Striim’s Streaming SQL on the denormalized data to make a real time stream of summary metrics about the events being processed available to Striim Real-Time Dashboards and other applications. For example, to create a running count of each measure_id in the last hour, from the stream of enriched orders, you would use a window, and the familiar group by clause.

				
					CREATE WINDOW IncidentWindow
OVER EnrichCQ
KEEP WITHIN 1 HOUR
PARTITION BY Measure_id;he familiar group by clause.
SELECT Measure_id,
COUNT(*) as MeasureCount,
FROM IncidentWindow
GROUP BY Measure_id;
				
			

Monitoring

With the Striim Monitoring Web UI we can now monitor our data pipeline with real-time information for the cluster, application components, servers, and agents. The Main monitor page allows to visualize summary statistics for Events

Processed, App CPU%, Server Memory, or Server CPU%. Below the Monitor App page displays our App Resources, Performance and Components.

Summary

In this blog post, we discussed how we can use Striim to:

  1. Perform Oracle Change Data Capture to stream data base changes in real-time

  2. Use streaming SQL and caches to easily denormalize data in order to make relevant data available together

  3. Load streaming enriched data to Mongodb for real-time analytics

  4. Use Kafka for persistent streams

  5. Create rolling aggregates with streaming SQL

  6. Continuously monitor data pipelines

Wrapping Up

Striim’s power is in its ability to ingest data from various sources and stream it to the same (or different) destinations. This means data going through Striim is held to the same standard of replication, monitoring, and reliability.

 

In conclusion, this recipe showcased a shift from batch ETL to event-driven distributed stream processing. By capturing Oracle change data events in real-time, enriching them through in-memory, SQL-based denormalization, and seamlessly delivering to the Azure Cloud, we achieved scalable, cost-effective analytics without disrupting the source database. Moreover, the enriched events, optionally supported by Kafka, offer the flexibility to incrementally integrate additional event-driven applications or services.

To give anything you’ve seen in this recipe a try, sign up for our developer edition or sign up for Striim Cloud free trial.

Stream Data from PostgreSQL to Google BigQuery with Striim Cloud – Part 1

Tutorial

Stream Data from PostgreSQL to Google BigQuery with Striim Cloud – Part 1

Use Striim Cloud to stream data securely from PostgreSQL database into Google BigQuery

Benefits

Operational Analytics
Analyze real-time operational, transactional data from PostgreSQL in BigQuery

Secure Data Transfer
Secure data-at-rest and in-flight with simple SSH tunnel configuration and automatic encryption

Build Real-Time Analytical ModelsUse the power of Real Time Data Streaming to build Real-Time analytical and ML models
On this page

Overview

Striim is a next generation Cloud Data Integration product that offers change data capture (CDC) enabling continuous replication from popular databases
such as Oracle, SQLServer, PostgreSQL and many others.

In addition to CDC connectors, Striim has hundreds of automated adapters for file-based data (logs, xml, csv), IoT data (OPCUA, MQTT), and applications such as Salesforce and SAP. Our SQL-based stream processing engine makes it easy to
enrich and normalize data before it’s written to targets like BigQuery and Snowflake.

Traditionally Data warehouses that required data to be transferred use batch processing but with Striim’s streaming platform data can be replicated in real-time efficiently.

Securing the in-flight data is very important in any real world application. A jump host creates an encrypted public connection into a secure environment.

In this tutorial, we’ll walk you through how to create a secure SSH tunnel between Striim cloud and your on-premise/cloud databases with an example where data is streamed securely from PostgreSQL database into Google
BigQuery
through SSH tunneling.

Core Striim Components

PostgreSQL Reader: PostgreSQL Reader uses the wal2json plugin to read PostgreSQL change data. 1.x releases of wal2jon can not read transactions larger than 1 GB.

Stream: A stream passes one component’s output to one or more other components. For example, a simple flow that only writes to a file might have this sequence

BigQueryWriter: Striim’s BigQueryWriter writes the data from various supported sources into Google’s BigQuery data warehouse to support real time data warehousing and reporting.

(Optional) Step 1: Secure connectivity to your database

Striim provides multiple methods for secure connectivity to your database. For Striim Developer, you can allowlist the IP address in your email invite.

In Striim Cloud, you can either allow your service IP or follow the below steps to configure a jump server.

Follow the steps below to set up your jump server on Google Compute Engine:

Go to google cloud console -> Compute Engine -> VM instances and create a new VM instance that would act as the jump server.

Add the jump server’s IP address to authorized networks of source database, in this case postgres instance

Step 2: Create SSH tunnel on Striim Cloud

Once the jump host is set up, an SSH tunnel will be created from Striim Cloud UI to establish a connection to the source database through the jump server.

Follow the steps below to configure an SSH tunnel between source database and Striim cloud:

Go to striim cloud console and launch a service instance. Under security create a new SSH tunnel and configure it as below.

Go to jump server (VM instance) and add the service key copied from the above step’

Now Striim server is integrated with both postgres and Bigquery and we are ready to configure Striim app for data migration.

Step 3: Launch Striim Server and Connect the Postgres Instance

For this recipe, we will host our app in Striim Cloud but there is always a free trial to see the power of Striim’s Change Data Capture.

Follow the steps below to connect Striim server to postgres instance containing the source database:

Click on Apps to display the app management screen:

Click on Create app :

Select Source and Target under create app from wizard:

Give a name to your app and establish the connection between striim server and postgres instance.



Once the connection between posgres and striim server is established, we will link the target data warehouse, in this case Google Bigquery. Striim also offers schema conversion feature where table schema can be validated for both source and target that
helps in migration of source schema to the target database.


Step 4: Targeting Google Bigquery

You have to make sure the instance of Bigquery mirrors the tables in the source database.This can be done from Google Cloud Console interface.Under the project inside Google Bigquery, create the dataset and an emty table containing all the columns that is populated
with data migrated from postgres database.

Follow the steps below to create a new dataset in Bigquery and integrating with Striim app using a service account:

Create a dataset with tables mirroring the source schema.

Go back to app wizard and enter the service key of your BigQuery instance to connect the app with target data warehouse

Now Striim server is integrated with both postgres and Bigquery and we are ready to configure Striim app for data migration.

Step 5: Configure Striim app using UI

With source, target and Striim server integrated for data migration, a few configuration on the easy to understand app UI is made before deploying and running the app.

  • Click on source and add the connection url (tunnel address), user name and password in proper format:
  • Click on target and add the input stream, source and target tables and upload the service key for Bigquery instance

    Now the app is good to go for deployment and data migration.

Step 6: Deploy and Run the Striim app for Fast Data Migration

In this section you will deploy and run the final app to visualize the power of Change Data Capture in Striim’s next generation technology.

Setting up the Postgres to BigQuery Streaming App

Step 1: Follow the recipe to configure your jump server and SSh tunnel to Striim cloud.

Step 2: Set up your Postgres Source and BigQuery Target.

Step 3: Create Postgres to BQ Striim app using wizard as shown in the recipe

Step 4: Migrate your data from source to Target by deploying your striim app

Wrapping Up: Start Your Free Trial

Our tutorial showed you how easy it is to migrate data from PostgreSQL to Google BigQuery, a leading cloud data warehouse. By constantly moving your data into BigQuery, you could now start building analytics or machine
learning models on top, all with
minimal impact to your current systems. You could also start ingesting and normalizing more datasets with Striim to fully take advantage of your data when combined with the power of BigQuery.

As always, feel free to reach out to our integration experts to schedule a demo, or try Striim for free here.

Tools you need

Striim

Striim’s unified data integration and streaming platform connects clouds, data and applications.

PostgreSQL

PostgreSQL is an open-source relational database management system.

Google BigQuery

BigQuery is a serverless, highly scalable multicloud data warehouse.

Google Compute Engine

Google Compute Engine offers a scalable number of Virtual Machines. In this use case a VM instance will serve as jump host for SSH tunneling

Real-Time Hotspot Detection For Transportation with Striim and BigQuery

Tutorial

Real-Time Hotspot Detection For Transportation with Striim and BigQuery

Detect and visualize cab booking hotspots using Striim and BigQuery

Benefits

Analyze Real-Time Operational DataStriim facilitates lightning speed data transfer that enables tracking of real-time booking data for strategic decisionsDetect Hotspots in Real TimeWith Striim’s live dashboard millions of data can be processed to capture booking updates at different geographical locationsPerform Real-time Analytics with Fast data replicationWith CDC technology live data from multiple sources can be replicated to the cloud for access across multiple teams for analytics
On this page

Overview

Transportation services like Uber and Lyft require streaming data with real-time processing for actionable insights on a minute-by-minute basis. While batch data can provide powerful insight on medium or long-term trends, in this age live data analytics is an essential component
of enterprise decision making. It is said data loses its importance within 30 minutes of generation. To facilitate better performance for companies that hugely depend on live data, Striim offers continuous data ingestion from multiple data sources in real-time. With Striim’s powerful log-based Change Data
Capture platform, database transactions can be captured and processed in real-time along with data migration to multiple clouds. This technology can be used by all e-commerce, food-delivery platforms, transportation services, and many others that harnesses real-time analytics to generate value. In this
blog, I have shown how real-time cab booking data can be streamed to Striim’s platform and processed in-flight for real-time visualization through Striim’s dashboard and simultaneous data migration to BigQuery at the same time.

Video Walkthrough

Core Striim Components

File Reader: Reads files from disk using a compatible parser.

Stream: A stream passes one component’s output to one or more other components. For example, a simple flow that only writes to a file might have this sequence

Continuous Query : Striim Continuous queries are are continually running SQL queries that act on real-time data and may be used to filter, aggregate, join, enrich, and transform events.

Window: A window bounds real-time data by time, event count or both. A window is required for an application to aggregate or perform calculations on data, populate the dashboard, or send alerts when conditions deviate from normal parameters.

WAction and WActionStore: A WActionStore stores event data from one or more sources based on criteria defined in one or more queries. These events may be related using common key fields.

BigQueryWriter: Striim’s BigQueryWriter writes the data from various supported sources into Google’s BigQuery data warehouse to support real time data warehousing and reporting.

Dashboard: A Striim dashboard gives you a visual representation of data read and written by a Striim application

Using Striim for CDC

Change Data Capture has gained popularity in the last decade as companies have realized the power of real-time data analysis from OLTP databases. In this example, data is acquired from a CSV file and streamed with a window of 30 minutes. Due to memory limitation, the window was
kept for 30 minutes for better visualization. Ideally, Striim can handle a window of even 1 second when the amount of data is huge. The data was also processed inside Striim and the change was captured and migrated to BigQuery data warehouse.

The dataset used in this example contains 4.5 million uber booking data that contains 5 features, DateTime, latitude, longitude, and TLC base company
at NYC. The goal is to stream the data through Striim’s CDC platform and detect the areas that have more bookings (hotspots) every 30 minutes. The latitude and longitude value was converted using the following query to cluster them into certain areas

The following steps were followed before deploying the app

Step 1: Reading Data from CSV and Streaming into Continuous Query through stream

In this step, the data is read from the data source using a delim-separated parser (cabCsvSource) with FileReader adapter used for CSV source. The data is then streamed into cabCSvSourceStream which reaches cabCQ for continuous query. The SQL query at CabCQ2 converts the
incoming data into required format

Step 2: Sending the data for processing in Striim as well as into BigQuery

The data returned from the continuous query is then sent for processing through a 30-minute window and also migrated to BigQuery for storage. This is a unique feature of the Striim platform that allows data migration and processing at the same time. The data transferred to
Bigquery can be used by various teams for analytics while Striim’s processing gives valuable insights through its dashboard.

The connection between Striim and BigQuery is set up through a service account that allows Striim application to access BigQuery. A table structure is created within BigQuery instance that replicates the schema of incoming stream of data.

Step 3: Aggregating Data using Continuous Query

After the data is captured, a query is applied on each window that clusters the latitudes and longitudes of pickup locations into discrete areas and returns the aggregate demand of each area in those 30 minutes. There is a slight difference between the clustering query that
goes into BigQuery(AggregateCQ) and the one that goes into StriimWaction(LatLonCQ). The data in StriimWaction is used for dashboard tracking hotspots, so the first latitude and longitude value is taken as the estimate of area. The data that goes into BigQuery returns all latitude longitude values and
could be used for further analytics.

Striim’s Dashboard for Real Time Analytics

The dashboard ingested data from two different streams. The window30CabData provided data to the bar chart that tracked the number of vehicles from each company every 30 mins and the vector map fetched data from LatLon2dashboardWaction that had the aggregated count of bookings
for every area in a 30-minute window. As seen from the dashboard snippet below, the dashboard was configured to return red for high demand (>30), yellow for medium demand (between 15-30), and green for low demand(less than 15). This can be very useful to companies for real-time tracking analytics
and
data migration. The dashboard has two components, a query where data is fetched from the app and processed as required. The other component is configuration, where specifications on visualization are entered. The SQL queries below show the query for the vector map and bar chart. The two snippets
from the
dashboard show an instant of booking at different locations and the number of vehicles from each TCL company.


Migrating Data to BigQuery

Finally, the aggregated data along with the source data was migrated to BigQuery. Migration to BigQuery followed the same process of creating a table schema that mirrored the structure of incoming data from Striim.

Flowchart

Below is the UI of cabBookingApp that tracks bookings within a given time window and returns aggregated demand in a Striim dashboard. The app also streams data from source to BigQuery .

Here is an overview of each component from the flowchart:

Setting Up the Tracking Application

Step 1: Download the data and Sample TQL file from our github repo

You can download the TQL files for streaming app and lag monitor app from our github repository. Deploy the Striim app on your Striim server.

Step 2: Configure your CSV source and BigQuery target and add it to the source and target components of the app

You can find the csv dataset in our github repo. Set up your BigQuery dataset and table that will act as a target for the streaming application

Step 3: Follow the recipe to create Striim Dashboard for real-time analytics

The recipe gives a detail on how to set up a Striim dashboard for this use case

Step 4: Run your app and dashboard

Deploy and run your app and dashboard for real-time tracking and analytics

Why Striim?

Striim is a single platform for real-time data ingestion, stream processing, pipeline monitoring, and real-time delivery with validation. It uses low-impact Change Data Capture technology to
migrate a wide variety of high-volume, high-velocity data from enterprise databases in real-time. Using Striim’s in-flight data processing and real-time dashboard, companies can generate maximum value from streaming data. Enterprises dealing with astronomical data can incorporate Striim to maximize profit with real-time strategic decisions. Striim is used by companies like Google, Macy’s, and Gartner for real-time data migration and analytics. In this data-driven age, generate maximum profit for your company using Striim’s CDC-powered platform.

To learn more about Striim for Google BigQuery, check out the related product page. Striim supports many different sources and targets. To see how Striim can help with your move to cloud-based services, schedule a demo with a Striim technologist or download a free trial of the platform.

Tools you need

Striim

Striim’s unified data integration and streaming platform connects clouds, data and applications.

Google BigQuery

BigQuery is a serverless, highly scalable multicloud data warehouse.

Real-Time Point-of-Sale (POS) Analytics with Striim and BigQuery

Tutorial

Real-Time Point-of-Sale (POS) Analytics with Striim and BigQuery

How to use Striim to generate transaction reports and send alerts when transaction counts are higher or lower than average

Benefits

Detect Anomalies in Real Time
Generate reports on transaction activity and get alerts when transaction counts deviate from the meanManage Inventory Levels
Monitor your stock levels in real time and get notified when stock is lowGet a Live View of Your Sales
Use POS Analytics to get real-time insights into customer purchases
On this page

Analyze point-of-sale retail data in real-time with Striim, BigQuery, and the BI tool of your choice.

  1. Streaming Change Data Capture – Striim
  2. Streaming Data Pipelines in SQL – Striim
  3. Streaming Data Visualiztion – Striim
  4. Cloud Data Warehouse with incremental views – BigQuery
  5. Reporting & BI – Metabase

Overview

Before following the instructions below, sign up for Striim Developer to run through this tutorial (no credit card required).

In the web UI, from the top menu, select Apps > View All Apps.

If you don’t see PosApp anywhere on the page (you may need to expand the Samples group) , download the TQL from Striim’s GitHub page. select Create App > Import TQL file, navigate to Striim/Samples/PosApp, double-click PosApp.tql, enter Samples as the namespace, and click Import.

At the bottom right corner of the PosApp tile, select … > Manage Flow. The Flow Designer displays a graphical representation of the application flow:

PosApp Graphical Representation

The following is simplified diagram of that flow:

Diagram of PosApp

Step 1: Acquire Data

Striim has hundreds of connectors including change data capture from databases such as Oracle, SQLServer, MySQL, and PostgreSQL.

In this example, we’ll use a simple file source. We call this a ‘Source’ component in Striim.

CsvDataSouce

Double-clicking CsvDataSource opens it for editing:

CsvDataSource Editor

This is the primary data source for this application. In a real-world application, it would be real-time data. Here, the data comes from a comma-delimited file, posdata.csv. Here are the first two lines of that file:

BUSINESS NAME,MERCHANT ID,PRIMARY ACCOUNT NUMBER,POS DATA CODE,DATETIME,EXP DATE,CURRENCY CODE,AUTH AMOUNT,TERMINAL ID,ZIP,CITY,COMPANY

1,D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu,6705362103919221351,0,20130312173210,0916,USD,2.20,5150279519809946,41363,Quicksand


In Striim terms, each line of the file is an event, which in many ways is comparable to a row in a SQL database table, and and can be used in similar ways. Click Show Advanced Settings to see the DSVParser properties:

DSVParser1

DSVParser2

DSVParser3

The True setting for the header property indicates that the first line contains field labels that are not to be treated as data.

The “Output to” stream CsvStream automatically inherits the WAEvent type associated with the CSVReader:

CsvStream output

The only field used by this application is “data”, an array containing the delimited fields.

Step 2: Filter The Data Stream

Filter Data Stream

CsvDataSource outputs the data to CsvStream, which is the input for the query CsvToPosData:

Input for the query CsvToPosData

This CQ converts the comma-delimited fields from the source into typed fields in a stream that can be consumed by other Striim components. Here, “data” refers to the array mentioned above, and the number in brackets specifies a field from the array, counting from zero. Thus data[1] is MERCHANT ID,
data[4] is DATETIME, data[7] is AUTH AMOUNT, and data[9] is ZIP.

TO_STRING, TO_DATEF, and TO_DOUBLE functions cast the fields as the types to be used in the Output to stream. The DATETIME field from the source is converted to both a dateTime value, used as the event
timestamp by the application, and (via the DHOURS function) an integer hourValue, which is used to look up historical hourly averages from the HourlyAveLookup cache, discussed below.

The other six fields are discarded. Thus the first line of data from posdata.csv has at this point been code-reduced to five values:

  1. D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu (merchantId)
  2. 20130312173210 (DateTime)
  3. 17 (hourValue)
  4. 2.20 (amount)
  5. 41363 (zip)

The CsvToPosDemo query outputs the processed data to PosDataStream:

CsvToPosDemo query ouput

PosDataStream assigns the five remaining fields the names and data types in the order listed above:

  1. PRIMARY ACCOUNT NUMBER to merchantID
  2. DATETIME to dateTime
  3. the DATETIME substring to hourValue
  4. AUTH AMOUNT to amount
  5. ZIP to zip

Step 3: Define the Data Set

PosDataStream passes the data to the window PosData5Minutes:


A window is in many ways comparable to a table in a SQL database, just as the events it contains are comparable to rows in a table. The Mode and Size settings determine how many events the window will contain and how it will be refreshed. With the Mode set to Jumping, this window is refreshed with a
completely new set of events every five minutes. For example, if the first five-minute set of events received when the application runs from 1:00 pm through 1:05 pm, then the next set of events will run from 1:06 through 1:10, and so on. (If the Mode were set to Sliding, the window continuously add new
events and drop old ones so at to always contain the events of the most recent five minutes.)

Step 4: Process and Enhance the Data

The PosData5Minutes window sends each five-minute set of data to the GenerateMerchantTxRateOnly query. As you can see from the following schema diagram, this query is fairly complex:

The GenerateMerchantTxRateOnly query combines data from the PosData5Minutes event stream with data from the HourlyAveLookup cache. A cache is similar to a source, except that the data is static rather than real-time. In the real world, this data would come from a periodically updated table in the
payment processor’s system containing historical averages of the number of transactions processed for each merchant for each hour of each day of the week (168 values per merchant). In this sample application, the source is a file, hourlyData.txt, which to simplify the sample data set has only 24 values
per merchant, one for each hour in the day.

For each five-minute set of events received from the PosData5Minutes window, the GenerateMerchantTxRateOnly query ouputs one event for each merchantID found in the set to MerchantTxRateOnlyStream, which applies the MerchantTxRate type. The easiest way to summarize what is happening in the above diagram
is to describe where each of the fields in the MerchantTxRateOnlySteam comes from:

field description TQL
merchantId the merchantID field from PosData5Minutes
SELECT p.merchantID
zip the zip field from PosData5Minutes
    
SELECT … p.zip
startTime the dateTime field for the first event for the merchantID in the five-minute set from PosData5Minutes
SELECT … FIRST(p.dateTime)
count count of events for the merchantID in the five-minute set from PosData5Minutes
SELECT … COUNT(p.merchantID)
totalAmount sum of amount field values for the merchantID in the five-minute set from PosData5Minutes
SELECT … SUM(p.amount)
hourlyAve the hourlyAve value for the current hour from HourlyAveLookup, divided by 12 to give the five-minute average
SELECT … l.hourlyAve/12 …
    WHERE … p.hourValue = l.hourValue
upperLimit the hourlyAve value for the current hour from HourlyAveLookup, divided by 12, then multiplied by 1.15 if the value is 200 or less, 1.2 if the value is between 201 and 800, 1.25 if the value is between 801 and 10,000, or 1.5 if the value is over 10,000
SELECT …l.hourlyAve/12 * CASE 
    WHEN l.hourlyAve/12 >10000 THEN 1.15 
    WHEN l.hourlyAve/12 > 800 THEN 1.2 
    WHEN l.hourlyAve/12 > 200 THEN 1.25 
    ELSE 1.5 END
lowerLimit the hourlyAve value for the current hour from HourlyAveLookup, divided by 12, then divided by 1.15 if the value is 200 or less, 1.2 if the value is between 201 and 800, 1.25 if the value is between 801 and 10,000, or 1.5 if the value is over 10,000
SELECT …l.hourlyAve/12 / CASE 
    WHEN l.hourlyAve/12 >10000 THEN 1.15 
    WHEN l.hourlyAve/12 > 800 THEN 1.2 
    WHEN l.hourlyAve/12 > 200 THEN 1.25 
    ELSE 1.5 END
category, status placeholders for values to be added
SELECT … '<NOTSET>'

The MerchantTxRateOnlyStream passes this output to the GenerateMerchantTxRateWithStatus query, which populates the category and status fields by evaluating the count, upperLimit, and lowerLimit fields:

SELECT merchantId,
  zip,
  startTime,
  count,
  totalAmount,
  hourlyAve,
  upperLimit,
  lowerLimit,
    CASE
      WHEN count > 10000 THEN 'HOT'
      WHEN count > 800 THEN 'WARM'
      WHEN count > 200 THEN 'COOL'
      ELSE 'COLD' END,
    CASE
      WHEN count > upperLimit THEN 'TOOHIGH'
      WHEN count < lowerLimit THEN 'TOOLOW'
      ELSE 'OK' END
FROM MerchantTxRateOnlyStream

The category values are used by the Dashboard to color-code the map points. The status values are used by the GenerateAlerts query.

The output from the GenerateMerchantTxRateWithStatus query goes to MerchantTxRateWithStatusStream.

Step 5: Populate the Dashbhoard

The GenerateWactionContent query enhances the data from MerchantTxRateWithStatusStream with the merchant’s company, city, state, and zip code, and the latitude and longitude to position the merchant on the map, then populates the MerchantActivity WActionstore:

In a real-world application, the data for the NameLookup cache would come from a periodically updated table in the payment processor’s system, but the data for the ZipLookup cache might come from a file such as the one used in this sample application.

When the application finishes processing all the test data, the WActionStore will contain 423 WActions, one for each merchant. Each WAction includes the merchant’s context information (MerchantId, StartTime, CompanyName, Category, Status, Count, HourlyAve, UpperLimit, LowerLimit, Zip, City, State,
LatVal, and LongVal) and all events for that merchant from the MerchantTxRateWithStatusStream (merchantId, zip, String, startTime, count, totalAmount, hourlyAvet, upperLimit, lowerLimit, category, and status for each of 40 five-minute blocks). This data is used to populate the dashboard, as detailed in
PosAppDash.

Step 6: Trigger Alerts

MerchantTxRateWithStatusStream sends the detailed event data to the GenerateAlerts query, which triggers alerts based on the Status value:

When a merchant’s status changes to TOOLOW or TOOHIGH, Striim will send an alert such as, “WARNING – alert from Striim – POSUnusualActivity – 2013-12-20 13:55:14 – Merchant Urban Outfitters Inc. count of 12012 is below lower limit of 13304.347826086958.” The “raise” value for the flag field instructs the subscription not to send another alert until the status returns to OK.

When the status returns to OK, Striim will send an alert such as, “INFO – alert from Striim – POSUnusualActivity – 2013-12-20 14:02:27 – Merchant Urban Outfitters Inc. count of 15853 is back between 13304.347826086958 and 17595.0.” The “cancel” value for the flag field instructs the subscription to send an alert the next time the status changes to TOOLOW or TOOHIGH. See Sending alerts from applications for more information on info, warning, raise, and cancel.

Step 7: Stream data to BigQuery

Now you can stream the enriched point-of-sale analytical data to cloud data platforms such as BigQuery, Snowflake, and Redshift. In this example, we’re going to stream the data to BigQuery for storage and analysis.

Striim can stream data into BigQuery in real-time while optimizing costs with partition pruning on merge operations. After the data is loaded into BigQuery, your team can analyze it with your business intelligence tool of choice. In this example we’re generating the reports in Metabase.

Get Started

Try this recipe yourself for free by signing up for a trial or talking to sales team for hands-on guidance. Striim can be deployed on your laptop, in a docker container, or directly on your cloud-service provider of
choice.

Tools you need

Striim

Striim’s unified data integration and streaming platform connects clouds, data and applications.

Google BigQuery

BigQuery is a serverless, highly scalable multicloud data warehouse.

BI Tool (Metabase)

Metabase is an open source business intelligence tool.

Streaming Data Integration: Using CDC to Stream Database Changes

Tutorial

Streaming Data Integration: Using CDC to Stream Database Changes

How to use the PostgreSQL CDC (PostgreSQL Reader) with a Striim Target

Benefits

Get a Live ViewUse Striim CDC to stream data for a continuous view of your transactional dataEmpower Your TeamsGive teams across your organization a real-time view into your database transactionsReact in Real TimeReact to business events as they happen; not minutes or hours later.
On this page

Overview

This is the first in a two-part blog post discussing how to use Striim for streaming database changes to Apache Kafka. Striim offers continuous data ingestion from databases and other sources in real time; transformation and enrichment using Streaming SQL; delivery of data to multiple targets in the cloud or on-premise; and visualization of results. In this part, we will use Striim’s low-impact, real-time change data capture (CDC)
feature to stream database changes (inserts, updates, and deletes) from an operational database into Striim.

What is Change Data Capture

Databases maintain change logs that record all changes made to the database contents and metadata. These change logs can be used for database recovery in the event of a crash, and also for replication or integration.

Striim Data Flow CDC Change Log

With Striim’s log-based CDC, new database transactions – including inserts, updates, and deletes – are read from source databases’ change logs and turned into a stream of events without impacting the database workload. Striim
offers CDC for Oracle, SQL Server, HPE NonStop, MySQL, PostgreSQL, MongoDB,
and MariaDB.

Why use Striim’s CDC?

Businesses use Striim’s CDC capabilities to feed real-time data to their big data lakes, cloud databases, and enterprise messaging systems, such as Kafka, for timely operational decision making. They also migrate from on-premises databases to cloud environments
without downtime and keep cloud-based analytics environments up-to-date with on-premises databases using CDC.

How to use Striim’s CDC?

Striim’s easy-to-use CDC template wizards automate the creation of applications that leverage change data capture, to stream events as they are created, from various source
systems to various targets. Apps created with templates may be modified using Flow Designer or by exporting TQL, editing it, and importing the modified TQL. Striim has templates for many source-target combinations.

In addition, Striim offers pre-built integration applications for bulk loading and CDC from PostgreSQL source databases to target systems including PostgreSQL database, Kafka, and files. You can start these applications
in seconds by going to the Applications section of the Striim platform.

striim sample applications

Striim pre-built sample integration applications.

In this post, we will show how to use the PostgreSQL CDC (PostgreSQL Reader) with a Striim Target using the wizards for a custom application instead of using the pre-built application mentioned above. The instructions below assume that you are using the PostgreSQL instance that comes with the Striim
platform. If you are using your own PostgreSQL database instance, please review our instructions on how to set up PostgreSQL for CDC.

Step 1: Using the CDC Template

To start building the CDC application, in the Striim web UI, go to the Apps page and select Add App > Start with Template. Enter PostgreSQL in the search field to narrow down the sources and select “PostgreSQL Reader to Striim”.

Wizard template selection when creating a new app.

Next enter the name and namespace for your application (the namespace is a way of grouping applications together).

Step 2: Specifying the Data Source Properties

In the SETUP POSTGRESQL READER specify the data source and table properties:

  • the connection URL, username, and password.
  • the tables for which you want to read change data.
    configure postgresql reader source
    Configuring the data source in the wizard.

After you complete this step, your application will open in the Flow Designer.

wizard generated data flow

The wizard generates a data flow.

In the flow designer, you can add various processors, enrichers, transformers, and targets as shown below to complete your pipeline, in some cases with zero coding.

striim flow designer enrichers

striim flow designer processor

Flow designer enrichers and processors.

striim flow designer transformers

striim flow designer targets

Flow designer event transformers and targets.

In the next blog post, we will discuss how to add a Kafka target to this data pipeline. In the meantime, please feel free to request
a demo
with one of our lead technologists, tailored to your environment.

Tools you need

Striim

Striim’s unified data integration and streaming platform connects clouds, data and applications.

PostgreSQL

PostgreSQL is an open-source relational database management system.

Migrating from MySQL to Google Cloud SQL with Change Data Capture​

Tutorial

Migrating from MySQL to Google Cloud SQL with Change Data Capture

How to Change Data Capture (CDC) to synchronize data from MySQL into a Google Cloud SQL instance

Benefits

Simplify Cloud MigrationsSay goodbye to downtime and complex migrations. Striim seamlessly loads and syncs your changing data.Add New Cloud ApplicationsAdd new, client-facing applications by synchronizing an existing on-premises application’s data set.Sync Current and New DatabasesKeep  data in your current MySQL instance in sync with your new CloudSQL deployment until your migration goes live
On this page

Overview

Migrating from MySQL to Google Cloud SQL opens up cloud services that offer a wealth of capabilities with low management overhead and cost. But, moving your existing on-premises applications to the cloud can be a challenge. Existing applications built on top of on-premises deployments of databases like MySQL. In this blog post we are going to use a database technology called Change Data Capture to synchronize data from MySQL into a Google Cloud SQL instance.

Introduction

One of the major hurdles when migrating applications, whether you’re changing the technology or moving to the cloud, is migrating your data. The older and bigger the application, the more difficult that migration becomes. Traditional Extract, Translate, and Load (ETL) tools require multiple passes and, potentially, significant downtime to handle data migration activities. This is where real-time ETL tools like Striim shine.
There are a number of benefits in migrating applications this way, such as being able to:
Add a new, client-facing cloud application by synchronizing an existing, traditionally on-premises application’s data set.
Migrate one or more on-premises application (with data) to the cloud for production testing with almost zero impact on the existing application.
Let’s walk through an example of connecting an on-premises instance of MySQL to Google Cloud SQL for MySQL.

Step 1: Set Up the MySQL Database

Before we dive into Striim, we are assuming you have an on-premises MySQL instance already configured and containing relevant data. For the purpose of this post, the dataset we have loaded data from a GitHub source (https://github.com/datacharmer/test_db) in a local MySQL instance. The data set is pretty large, which is perfect for our purposes, and contains a dummy set of employee information, including salaries.

MySQL to CloudSQL

Rather than importing all the data this data set contains, I’ve excluded the load_salaries2.dump and load_salaries3.dump files. This will allow us to insert a lot of data after Striim has been configured to show how powerful Change Data Capture is.

Step 2: Set Up the Striim Application

Now that we have an on-premises data set in MySQL, let’s set up a new Striim application on Google Cloud Platform to act as the migration service.
Open your Google Cloud console and open or start a new project. Go to the marketplace and search for Striim.

MySQL to CloudSQL

A number of options should return, but the one we’re after is the first item, which allows integration of real-time data to GCP.

MySQL to CloudSQL

Select this option and start the deployment process by pressing the deploy button at the bottom of this screen. For this tutorial, we’ll use the basic defaults for a Striim server. In production, however, you’d need to size appropriately depending on your load.

Step 3: Create a Target Database

While we wait for the Striim server to deploy, let’s create a Google SQL database to which we’ll migrate our database. Select the SQL option from the side menu in Google Cloud and create a new MySQL instance.

MySQL to CloudSQL

Once again, we’ll use the defaults for a basic Google MySQL instance. Open the instance and copy the instance connection name for use later. Then open the database instance and take note of the IP address.
We also need to create the database structure for the data we imported into the local MySQL instance. To do this, open the Google Cloud shell, log into the MySQL server, and run the SQL to create the table structure. Striim also needs a checkpoint table to keep the state in the event of failures, so create that table structure using the following:

                      CREATE TABLE CHKPOINT (
                      id VARCHAR(100) PRIMARY KEY,
                          sourceposition BLOB,
                          pendingddl BIT(1),
                      ddl LONGTEXT
                  );

Step 4: Initial Load Application

Open the Google Console and go back to the Deployment Manager, and click “Visit site”. It’s important to note that the Striim VM currently has a dynamic external IP address. In a production environment, you’ll want to set this to static so it won’t change.
When you first visit the site, you’ll see a congratulations screen. Click accept and fill in the basic details. Leave the license field blank for the trial version of Striim, or add your license key if you have one.
MySQL to CloudSQL
The first thing we need to do is create an application that performs the initial load of our current data set. There is no wizard for setting up an initial load application that we require, so go to Apps and create an app from scratch.


First, let’s add a MySQL reader from the sources tab on the left. This will access our local database to load the initial set of data. To read from a local server we need to use a JDBC style URL using the template:
jdbc:mysql://:/
We are also mapping the tables we want to sync by specifying them in the tables folder using

.

This allows us to restrict what is synchronized. Finally, under output to, specify a new WAEvent type for this connector.


Once we have our source wired up, we need to add a target to the flow so our data starts to transfer. Using a process similar to the one we used previously, add the GoogleCloudWriter target with the Google cloud instance in the connection URL. For the tables, this time we need to match the source and targets together using the form:
.,.



Once both the source and target connectors have been configured, deploy and start the application to begin the initial load process.



After the application goes through the starting process we can click on the monitor button to show the performance of the application. This will take a little while to complete, depending on your database size

Step 5: Change Data Capture

While the initial load takes place, let’s create the Change Data Capture (CDC) application to get ready for the synchronization process.

This time we are going to use a wizard to create the application. Click on Add Apps, then select the option to start with a Template. Striim comes with a lot of templates for different use cases out of the box. Scroll down to Streaming Integration for MySQL, click “show more,” then look for MySQL CDC to Cloud SQL MySQL. This option sets up a CDC application for MySQL to Google Cloud SQL.

Fill out the connection information for your on-premises application and click next. This should connect to the agent and ensure everything is correct.



Once everything is connected, check the tables you selected in the first application. These will synchronize any changes that occur.



Now we need to link our source to our target. Specify the connection details for your Google SQL instance using the IP address from the previous step. Fill in the username, password, and list of tables from the source database and click next. When you’ve finished the wizard, the application should be ready to go.



If the previous data load application has finished, stop the data load application and start the Change Data Capture application. Once the application has started, start loading transactions into your on-premises database. This should start synchronizing the data that changes up to your Google Cloud instance.

Open the Change Data Capture application and select monitor. You should see both the input and output figures as the application keeps track of your on-premises database. The activity chart should be showing the throughput of the records synchronizing from one location to another.
If you open the database console in Google Cloud and run a “SELECT COUNT(salary) FROM salaries” statement a couple of times, you should see the count figure rising.

Step 6: Adding More Load

While the servers are synchronizing, let’s go back to our local MySQL and add some other transactions. Import the remaining two salaries files, load_salaries2.dump and load_salaries3.dump. This will provide additional transactions to be synchronized and you’ll see Striim continue to add transactions as they happen without needing to do anything else.

Next Steps

We looked at a really quick and easy way to synchronize an on-premises instance of MySQL to Google Cloud SQL using Striim. At this point, you could start using the cloud database to run additional applications or do data analysis — without affecting the performance and use of your existing system.
If you open the menu on the Striim admin page, then open the apps section, and finally open this application, you’ll also see other steps you could add to this flow that support even more complex use cases, such as adding in transforms, combining multiple sources, or even splitting across targets.
To learn more about migrating from MySQL to Google Cloud SQL, check out the product page. To see how Striim can help with your move to cloud-based services, schedule a demo with a Striim technologist, or download a free trial of the platform.

Tools you need

Striim

Striim’s unified data integration and streaming platform connects clouds, data and applications.

MySQL

MySQL is an open-source relational database management system.

Google Cloud SQL

Google Cloud SQL is a fully managed relational database service for MySQL, PostgreSQL and SQL Server.

Migrate and Replicate Data from SQL Server to Snowflake with Striim

Tutorial

Migrate and Replicate Data from SQL Server to Snowflake with Striim

How to use Striim to migrate schemas and data from an existing SQL Server database into Snowflake

Benefits

Operational AnalyticsAnalyze your data in real-time without impacting the performance of your operational database.Control Your CostsMove data to Snowflake incrementally while controlling upload and merge intervals to optimize compute costsGet a Live ViewUse Striim CDC to stream data to Snowflake for a continuous view of your SQLServer transactions.
On this page

What is Striim?

Striim is a next generation Cloud Data Integration product that offers change data capture (CDC) enabling continuous replication from popular databases such as Oracle, SQLServer, PostgreSQL and many others.

In addition to CDC connectors, Striim has hundreds of automated adapters for file-based data (logs, xml, csv), IoT data (OPCUA, MQTT), and applications such as Salesforce and SAP. Our SQL-based stream processing engine makes it easy to enrich and normalize data before it’s written to Snowflake.

In this tutorial, we’ll show you how to use Striim to migrate schemas and data from an existing SQL Server database into Snowflake.

Step 1: Prepare a Snowflake Database and Launch Striim

Before migrating your data from SQL Server, you must first create a database within Snowflake to store the migrated data. After that database has been created you can launch Striim as a Snowflake partner service directly from within Snowflake.

Follow the steps below to prepare a database and launch Striim in Snowflake:

Launch Snowflake in a web browser.

Click on Databases > Create:


create snowflake database

Enter a unique name for the database and click Finish:


create database mmodal

Click on Partner Connect in the top right corner of the navigation bar.

Locate and click on Striim in the list of Snowflake partners. Note: you may need to first switch your user role to ACCOUNTADMIN in order to launch Striim from Snowflake:



Activate the partner account if the account has not been previously activated:



Confirm that the database you created in steps 2 and 3 above is listed in Database(s) with USAGE privilege granted and click Connect:



Note: On subsequent launches after activation has been completed for the first time, Snowflake will just prompt you to launch:



Step 2: Create a Striim Service to Host a Data Migration App

In Striim an app will be used to migrate the data. Before you can create that app, you need to first create and configure a service to host the app.

Follow the steps below to create a new Striim service:

Click on Marketplace in the top menu.

Locate the Snowflake app and click on Create:



Enter a unique name in the Name field noting the naming requirements listed:



(Optional) Click Show advanced options and specify the Service Version and Cluster Type.

Click Create. The browser will redirect to the Services screen.

Wait for the new service to enter the Running state.

Click on Launch:



The service will open in a new browser tab.

Step 3: Create a Data Migration App on the Striim Service

With the service now created and launched, you must create an app that runs on that service to perform the data migration.

Follow the steps below to create a new data migration app:

Click on Apps to display the app management screen:



Click Create app:



Click on SQL Server Database to Snowflake:



Enter a name for the new application and the namespace and click Save:



The data migration wizard is displayed:



Step 4: Prepare for Data Migration to Snowflake

In this section you will configure your app to access your source SQL Server database. As you proceed through Striim’s migration wizard, Striim will validate that it can access and fetch the metadata and data of your source SQL Server database.

Follow the steps below to migrate data using Striim’s step-by-step wizard:

Enter the details of your existing SQL Server database from which data is to be migrated and click Next:



Striim will verify that it can connect to your database and obtain metadata:

Click Next to advance to the Select Schemas screen.

Select the schemas to migrate from your SQL Server database to Snowflake and click Next:



Striim will fetch and validate metadata for each table in your database:



Click Next to advance to the Select Tables screen. Navigate through each schema on the left-hand side, and select the tables from each to migrate:



Click Next to complete the wizard. The target creation screen is displayed:



Step 5: Prepare Your Target and Migrate Your Data to Snowflake

Now that Striim can read from your source SQL Server database, you must configure Striim to write to your target Snowflake database.

Follow the steps below to prepare a Snowflake target and start the migration process:

Enter a unique name for the target in the Target Name field on the Create Snowflake Target(s) screen.

Ensure Input From is set to the stream you created using the steps in the previous sections. Note that the name will be in the form of+ _OutputStream.

Prepare the URL of the target Snowflake database: copy the following URL into the Connection URL field and replace YOUR_HOST with the base host domain assigned by Snowflake to your account, and YOUR_DATABASE with the name of your database:


jdbc:snowflake://YOUR_HOST.snowflakecomputing.com/?db=YOUR_DATABASE_NAME&schema=public

For example, the following URL has a base URL of xr86987.ca-central-1.aws and the database name set to RNA:


jdbc:snowflake://xr86987.ca-central-1.aws.snowflakecomputing.com/?db=RNA&schema=public

Enter your credentials corresponding to your Snowflake account into the Username and Password fields.

(Optional) Modify which tables to migrate by configuring the table name(s) listed in the Tables field. By default, the tables listed will be based on those specified in the steps from the previous section and include the % as a wildcard character:



Click Next. Striim will recreate the schema(s) in your Snowflake database:



Click Next after target creation is complete. Striim will begin migrating your data to Snowflake and will provide a detailed Application Progress popup showing how the migration is progressing:



Wrapping Up: Start Your Free Trial

Our tutorial showed you how easy it is to migrate data from SQLServer to Snowflake, a leading cloud data warehouse. Once your data has been migrated, Striim enables continuous, real-time updates via Change Data Capture.

For instances where changes continue to be made to the data in your source database, Striim enables zero-downtime, zero-data loss migrations to Snowflake.

As always, feel free to reach out to our integration experts to schedule a demo, or try Striim for free here.

Tools you need

Striim

Striim’s unified data integration and streaming platform connects clouds, data and applications.

PostgreSQL

PostgreSQL is an open-source relational database management system.

Snowflake

Snowflake is a cloud-native relational data warehouse that offers flexible and scalable architecture for storage, compute and cloud services.

Streaming Data Integration Tutorial: Adding a Kafka Stream to a Real-Time Data Pipeline

Tutorial

Streaming Data Integration Tutorial: Adding a Kafka Stream to a Real-Time Data Pipeline

Connect your streaming pipelines to Apache Kafka seamlessly for maximum organizational adoption of real-time data

Benefits

Turn Your Database into a StreamUse non-intrusive CDC to Kafka to create persistent streams that can be accessed by multiple consumers.Empower Your TeamsGive teams across your organization a real-time view of your Oracle database transactionsGet Analytics-Ready DataGet your data ready for analytics before it lands in the cloud. Streaming SQL scales in memory to keep your data moving.
On this page

Overview

This is the second post in a two-part blog series discussing how to stream database changes into Kafka. You can read part one here. We will discuss adding a Kafka target to the CDC
source from the previous post. The application will ingest database changes (inserts, updates, and deletes) from the PostgreSQL source tables and deliver to Kafka to continuously to update a Kafka topic.

What is Kafka?

Apache Kafka is a popular distributed, fault-tolerant, high-performance messaging system.

Why use Striim with Kafka?

The Striim platform enables you to ingest data into Kafka, process it for different consumers, analyze, visualize, and distribute to a broad range of systems on-premises and in the cloud with an intuitive UI and SQL-based language for easy and fast development.

Step 1: How to add a Kafka Target to a Striim Dataflow

From the Striim Apps page, click on the app that we created in the previous blog post and select Manage Flow.


MyPostgreSQL-CDC App

This will open your application in the Flow Designer.

source flow

MyPostgrSQLCDC app data flow.

To do the writing to Kafka, we need to add a Target component into the dataflow. Click on the data stream, then on the plus (+) button, and select “Connect next Target component” from the menu.

connect component data flow

Connecting a target component to the data flow.

Step 2: Enter the Target Info

The next step is to specify how to write data to the target. With the New Target ADAPTER drop-down, select Kafka Writer Version 0.11.0, and enter a few connection properties including the target name, topic and broker URL.

configuring the kafka target

Configuring the Kafka target.

Step 3: Data Formatting

Different Kafka consumers may have different requirements for the data format. When writing to Kafka in Striim, you can choose the data format with the FORMATTER drop down and optional configuration properties. Striim supports JSON, Delimited, XML, Avro and free text formats, in this case we are selecting the JSONFormatter.

Configuring the Kafka target FORMATTER.

Configuring the Kafka target FORMATTER

Step 4: Deploying and Starting the Data Flow

The resulting data flow can now be modified, deployed, and started through the UI. In order to run the application, it first needs to be deployed, click on the ‘Created’ dropdown and select ‘Deploy App’ to show the Deploy UI.

Deploying the app

Deploying the app

The application can be deployed to all nodes, any one node, or predefined groups in a Striim cluster, the default is the least used node.

Deployment node selection

Deployment node selection

After deployment the application is ready to start, by selecting Start App.

Starting the app

Starting the app

Step 5: Testing the Data Flow

You can use the PostgreSQL to Kafka sample integration application, to insert, delete, and update the PosgtreSQL CDC source table, then you should see data flowing in the UI, indicated by a number of msgs/s. (Note the message sending happens fast and quickly returns to 0).

Testing the streaming data flow

Testing the streaming data flow

If you now click on the data stream in the middle and click on the eye icon, you can preview the data flowing between PostgreSQL and Kafka. Here you can see the data, metadata (these are all updates) and before values (what the data was before the update).

Previewing the data flowing from PostgreSQL to Kafka

Previewing the data flowing from PostgreSQL to Kafka

There are many other sources and targets that Striim supports for streaming data integration. Please request a demo with one of our lead technologists, tailored to your environment.

Tools you need

Striim

Striim’s unified data integration and streaming platform connects clouds, data and applications.

PostgreSQL

PostgreSQL is an open-source relational database management system.

Snowflake

Snowflake is a cloud-native relational data warehouse that offers flexible and scalable architecture for storage, compute and cloud services.

Back to top