Data is increasingly siloed, making it harder for companies to extract the most value from their data. This is compounded by the fact that over 90% of companies plan on having hybrid cloud and or multi-cloud operations by 2022.
Watch this on-demand webinar with James Serra (Data Platform Architecture Lead at EY) where we demystify building a multi-cloud data environment for operations and real-time analytics. We cover the following topics:
Pros and cons of multi-cloud vs doubling down on a single cloud
Enterprise data patterns such as Data Fabric, Data Mesh, and The Modern Data Stack
Data ingestionand data transformation in a multi-cloud/hybrid cloud environment
Comparison of data warehouses (Snowflake, Synapse, Redshift, BigQuery) for real-time workloads
Oracle Change Data Capture – An Event-Driven Architecture for Cloud Adoption
How to replace batch ETL by event-driven distributed stream processing
Benefits
Operational Analytics Use non-intrusive CDC to Kafka to create persistent streams that can be accessed by multiple consumers and automatically reflect upstream schema changes
Empower Your TeamsGive teams across your organization a real-time view of your Oracle database transactions.Get Analytics-Ready DataGet your data ready for analytics before it lands in the cloud. Process and analyze in-flight data with scalable streaming SQL.
On this page
Overview
All businesses rely on data. Historically, this data resided in monolithic databases, and batch ETL processes were used to move that data to warehouses and other data stores for reporting and analytics purposes. As businesses modernize, looking to the cloud for analytics, and striving for real-time data insights, they often find that these databases are difficult to completely replace, yet the data and transactions happening within them are essential for analytics. With over 80% of businesses noting that the volume & velocity of their data is rapidly increasing, scalable cloud adoption and change data capture from databases like Oracle, SQLServer, MySQL and others is more critical than ever before. Oracle change data capture is specifically one area where companies are seeing an influx of modern data integration use cases.
To resolve this, more and more companies are moving to event-driven architectures, because of the dynamic distributed scalability which makes sharing large volumes of data across systems possible.
In this post we will look at an example which replaces batch ETL by event-driven distributed stream processing: Oracle change data capture events are extracted as they are created; enriched with in-memory, SQL-based denormalization; then delivered to the Mongodb to provide scalable, real-time, low-cost analytics, without affecting the source database. We will also look at using the enriched events, optionally backed by Kafka, to incrementally add other event-driven applications or services.
Continuous Data Collection, Processing, Delivery, and Analytics with the Striim Platform
Event-Driven Architecture Patterns
Most business data is produced as a sequence of events, or an event stream: for example, web or mobile app interactions, devices, sensors, bank transactions, all continuously generate events. Even the current state of a database is the outcome of a sequence of events.
Treating state as the result of a sequence of events forms the core of several event-driven patterns.
Event Sourcing is an architectural pattern in which the state of the application is determined by a sequence of events. As an example, imagine that each “event” is an incremental update to an entry in a database. In this case, the state of a particular entry is simply the accumulation of events pertaining to that entry. In the example below the stream contains the queue of all deposit and withdrawal events, and the database table persists the current account balances.
Imagine Each Event as a Change to an Entry in a Database
The events in the stream can be used to reconstruct the current account balances in the database, but not the other way around. Databases can be replicated with a technology called Change Data Capture (CDC), which collects the changes being applied to a source database, as soon as they occur by monitoring its change log, turns them into a stream of events, then applies those changes to a target database. Source code version control is another well known example of this, where the current state of a file is some base version, plus the accumulation of all changes that have been made to it.
The Change Log can be used to Replicate a Database
What if you need to have the same set of data for different databases, for different types of use? With a stream, the same message can be processed by different consumers for different purposes. As shown below, the stream can act as a distribution point, where, following the polygot persistence pattern, events can be delivered to a variety of data stores, each using the most suited technology for a particular use case or materialized view.
Streaming Events Delivered to a Variety of Data Stores
Event-Driven Streaming ETL Use Case Example
Below is a diagram of the Event-Driven Streaming ETL use case example:
Event-Driven Streaming ETL Use Case Diagram
Striim’s low-impact, real-time Oracle change data capture (CDC) feature is used to stream database changes (inserts, updates and deletes) from an Operational Oracle database into Striim
CDC Events are enriched and denormalized with Streaming SQL and Cached data, in order to make relevant data available together
Enriched, denormalized events are streamed to CosmosDB for real-time analytics
Enriched streaming events can be monitored in real time with the Striim Web UI, and are available for further Streaming SQL analysis, wizard-based dashboards, and other applications in the cloud. You can use Striim by signing up for free Striim Developer or Striim Cloud trial.
Striim can simultaneously ingest data from other sources like Kafka and log files so all data is streamed with equal consistency. Please follow the instructions below to learn how to build a Oracle CDC to NoSQL MongoDB real-time streaming application:
Step1: Generate Schemas in your Oracle Database
You can find the csv data file in our github repository. Use the following schema to create two empty tables in your source database:
The HOSPITAL_DATA table, containing details about each hospital would be used as a cache to enrich our real-time data stream.
Insert the data from this csv file to the above table.
The HOSPITAL_COMPLICATIONS_DATA contains details of complications in various hospitals. Ideally the data is streamed in real-time but for our tutorial, we will import csv data for CDC.
Step 2: Replacing Batch Extract with Real Time Streaming of CDC Order Events
Striim’s easy-to-use CDC wizards automate the creation of applications that leverage change data capture, to stream events as they are created, from various source systems to various targets. In this example, shown below, we use Striim’s OracleReader (Oracle Change Data Capture) to read the hospital incident data in real-time and stream these insert, update, delete operations, as soon as the transactions commit, into Striim, without impacting the performance of the source database. Configure your source database by entering the hostname, username, password and table names.
Step 3: NULL Value handling
The data contains “Not Available” strings in some of the rows. Striim can manipulate the data in real-time to convert it into Null values using a Continuous Query component. Use the following query to change “Not Available” strings to Null:
SELECT
t
FROM complication_data_stream t
MODIFY
(
data[5] = CASE WHEN TO_STRING(data[5]) == "Not Available" THEN NULL else TO_STRING(data[5]) END,
data[6] = CASE WHEN TO_STRING(data[6]) == "Not Available" THEN NULL else TO_STRING(data[6]) END,
data[7] = CASE WHEN TO_STRING(data[7]) == "Not Available" THEN NULL else TO_STRING(data[7]) END,
data[8] = CASE WHEN TO_STRING(data[8]) == "Not Available" THEN NULL else TO_STRING(data[8]) END
);
Step 4: Using Continuous Query for Data Processing
The hospital_complications_data has a column COMPARED_TO_NATIONAL that indicates how the particular complication compares to national average. We will process this data to generate a column called ‘SCORE_COMPARISON’ that is in the scale of GOOD, BAD, OUTLIER or NULL and is easier to read. If “Number of Cases too small”, then the SCORE_COMPARISON is “Outlier” and if “worse than national average”, then the SCORE_COMPARISON is BAD otherwise GOOD.
SELECT
CASE WHEN TO_STRING(data[4]) =="Not Available" or TO_STRING(data[4]) =="Number of Cases Too Small"
THEN putUserData(t, 'SCORE_COMPARISON', "OUTLIER")
WHEN TO_STRING(data[4]) =="Worse than the National Rate"
THEN putUserData(t, 'SCORE_COMPARISON', "BAD")
WHEN TO_STRING(data[4]) =="Better than the National Rate" OR TO_STRING(data[4]) =="No Different than the National Rate"
THEN putUserData(t, 'SCORE_COMPARISON', "GOOD")
ELSE putUserData(t, 'SCORE_COMPARISON', NULL)
END
FROM nullified_stream2 t;
Step 5: Utilizing Caches For Enrichment
Relational Databases typically have a normalized schema which makes storage efficient, but causes joins for queries, and does not scale well horizontally. NoSQL databases typically have a denormalized schema which scales across a cluster because data that is read together is stored together.
With a normalized schema, a lot of the data fields will be in the form of IDs. This is very efficient for the database, but not very useful for downstream queries or analytics without any meaning or context. In this example we want to enrich the raw Orders data with reference data from the SalesRep table, correlated by the Order Sales_Rep_ID, to produce a denormalized record including the Sales Rep Name and Email information in order to make analysis easier by making this data available together.
Since the Striim platform is a high-speed, low latency, SQL-based stream processing platform, reference data also needs to be loaded into memory so that it can be joined with the streaming data without slowing things down. This is achieved through the use of the Cache component. Within the Striim platform, caches are backed by a distributed in-memory data grid that can contain millions of reference items distributed around a Striim cluster. Caches can be loaded from database queries, Hadoop, or files, and maintain data in-memory so that joining with them can be very fast. In this example, shown below, the cache is loaded with a query on the SalesRep table using the Striim DatabaseReader.
First we will define a cache type from the console. Run the following query from your console.
CREATE TYPE HospitalDataType(
PROVIDER_ID String KEY,
HOSPITAL_NAME String,
ADDRESS String,
CITY String,
STATE String,
ZIP_CODE String,
COUNTY String,
PHONE_NUMBER String
);
Now, drag a DB Cache component from the list of Striim Components on the left and enter your database details. The table will be queried to join with the streaming data.
Query:
SELECT PROVIDER_ID,HOSPITAL_NAME,ADDRESS,CITY,STATE,ZIP_CODE,COUNTY,PHONE_NUMBER FROM QATEST2.HOSPITAL_DATA;
Step 6: Joining Streaming and Cache Data For Real Time Transforming and Enrichment With SQL
We can process and enrich data-in-motion using continuous queries written in Striim’s SQL-based stream processing language. Using a SQL-based language is intuitive for data processing tasks, and most common SQL constructs can be utilized in a streaming environment. The main differences between using SQL for stream processing, and its more traditional use as a database query language, are that all processing is in-memory, and data is processed continuously, such that every event on an input data stream to a query can result in an output.
This is the query we will use to process and enrich the incoming data stream:
SELECT data[1] as provider_id, data[2] as Measure_Name, data[3] as Measure_id,
t.HOSPITAL_NAME as hosp_name,
t.state as cache_state,
t.phone_number as cache_phone
FROM score_comparison_stream n, hospital_data_cache t where t.provider_id=TO_STRING(n.data[1]);
In this query, we enriched our streaming data using cache data about hospital details. The result of this query is to continuously output enriched (denormalized) events, shown below, for every CDC event that occurs for the HOSPITAL_COMPLICATIONS_DATA table. So with this approach we can join streams from an Oracle Change Data Capture reader with cached data for enrichment.
Step 7: Loading the Enriched Data to the Cloud for Real Time Analytics
Now the Oracle CDC (Oracle change data capture) data, streamed and enriched through Striim, can be stored simultaneously in NoSQL Mongodb using Mongodb writer. Enter the connection url for your mongodb Nosql database in the following format and enter your username, password and collections name.
Step 8: Running the Oracle CDC to Mongodb streaming application
Now that the striim app is configured, you will deploy and run the CDC application. You can also download the TQL file (passphrase: striimrecipes) from our github repository and configure your own source and targets. Import the csv file to HOSPITAL_COMPLICATIONS_DATA table on your source database after the striim app has started running. The Change data capture is streamed through the various components for enrichment and processing. You can click on the ‘eye’ next to each stream component to see the data in real-time.
Using Kafka for Streaming Replay and Application Decoupling
The enriched stream of order events can be backed by or published to Kafka for stream persistence, laying the foundation for streaming replay and application decoupling. Striim’s native integration with Apache Kafka makes it quick and easy to leverage Kafka to make every data source re-playable, enabling recovery even for streaming sources that cannot be rewound. This also acts to decouple applications, enabling multiple applications to be powered by the same data source, and for new applications, caches or views to be added later.
Streaming SQL for Aggregates
We can further use Striim’s Streaming SQL on the denormalized data to make a real time stream of summary metrics about the events being processed available to Striim Real-Time Dashboards and other applications. For example, to create a running count of each measure_id in the last hour, from the stream of enriched orders, you would use a window, and the familiar group by clause.
CREATE WINDOW IncidentWindow
OVER EnrichCQ
KEEP WITHIN 1 HOUR
PARTITION BY Measure_id;he familiar group by clause.
SELECT Measure_id,
COUNT(*) as MeasureCount,
FROM IncidentWindow
GROUP BY Measure_id;
Monitoring
With the Striim Monitoring Web UI we can now monitor our data pipeline with real-time information for the cluster, application components, servers, and agents. The Main monitor page allows to visualize summary statistics for Events
Processed, App CPU%, Server Memory, or Server CPU%. Below the Monitor App page displays our App Resources, Performance and Components.
Summary
In this blog post, we discussed how we can use Striim to:
Use streaming SQL and caches to easily denormalize data in order to make relevant data available together
Load streaming enriched data to Mongodb for real-time analytics
Use Kafka for persistent streams
Create rolling aggregates with streaming SQL
Continuously monitor data pipelines
Wrapping Up
Striim’s power is in its ability to ingest data from various sources and stream it to the same (or different) destinations. This means data going through Striim is held to the same standard of replication, monitoring, and reliability.
In conclusion, this recipe showcased a shift from batch ETL to event-driven distributed stream processing. By capturing Oracle change data events in real-time, enriching them through in-memory, SQL-based denormalization, and seamlessly delivering to the Azure Cloud, we achieved scalable, cost-effective analytics without disrupting the source database. Moreover, the enriched events, optionally supported by Kafka, offer the flexibility to incrementally integrate additional event-driven applications or services.
To give anything you’ve seen in this recipe a try, sign up for our developer edition or sign up for Striim Cloud free trial.
Stream Data from PostgreSQL to Google BigQuery with Striim Cloud – Part 1
Use Striim Cloud to stream data securely from PostgreSQL database into Google BigQuery
Benefits
Operational Analytics
Analyze real-time operational, transactional data from PostgreSQL in BigQuery
Secure Data Transfer
Secure data-at-rest and in-flight with simple SSH tunnel configuration and automatic encryption
Build Real-Time Analytical ModelsUse the power of Real Time Data Streaming to build Real-Time analytical and ML models
On this page
Overview
Striim is a next generation Cloud Data Integration product that offers change data capture (CDC) enabling continuous replication from popular databases
such as Oracle, SQLServer, PostgreSQL and many others.
In addition to CDC connectors, Striim has hundreds of automated adapters for file-based data (logs, xml, csv), IoT data (OPCUA, MQTT), and applications such as Salesforce and SAP. Our SQL-based stream processing engine makes it easy to
enrich and normalize data before it’s written to targets like BigQuery and Snowflake.
Traditionally Data warehouses that required data to be transferred use batch processing but with Striim’s streaming platform data can be replicated in real-time efficiently.
Securing the in-flight data is very important in any real world application. A jump host creates an encrypted public connection into a secure environment.
In this tutorial, we’ll walk you through how to create a secure SSH tunnel between Striim cloud and your on-premise/cloud databases with an example where data is streamed securely from PostgreSQL database into Google
BigQuery through SSH tunneling.
Core Striim Components
PostgreSQL Reader: PostgreSQL Reader uses the wal2json plugin to read PostgreSQL change data. 1.x releases of wal2jon can not read transactions larger than 1 GB.
Stream: A stream passes one component’s output to one or more other components. For example, a simple flow that only writes to a file might have this sequence
BigQueryWriter: Striim’s BigQueryWriter writes the data from various supported sources into Google’s BigQuery data warehouse to support real time data warehousing and reporting.
(Optional) Step 1: Secure connectivity to your database
Striim provides multiple methods for secure connectivity to your database. For Striim Developer, you can allowlist the IP address in your email invite.
In Striim Cloud, you can either allow your service IP or follow the below steps to configure a jump server.
Follow the steps below to set up your jump server on Google Compute Engine:
Go to google cloud console -> Compute Engine -> VM instances and create a new VM instance that would act as the jump server.
Add the jump server’s IP address to authorized networks of source database, in this case postgres instance
Step 2: Create SSH tunnel on Striim Cloud
Once the jump host is set up, an SSH tunnel will be created from Striim Cloud UI to establish a connection to the source database through the jump server.
Follow the steps below to configure an SSH tunnel between source database and Striim cloud:
Go to striim cloud console and launch a service instance. Under security create a new SSH tunnel and configure it as below.
Go to jump server (VM instance) and add the service key copied from the above step’
Now Striim server is integrated with both postgres and Bigquery and we are ready to configure Striim app for data migration.
Step 3: Launch Striim Server and Connect the Postgres Instance
For this recipe, we will host our app in Striim Cloud but there is always a free trial to see the power of Striim’s Change Data Capture.
Follow the steps below to connect Striim server to postgres instance containing the source database:
Click on Apps to display the app management screen:
Click on Create app :
Select Source and Target under create app from wizard:
Give a name to your app and establish the connection between striim server and postgres instance.
Once the connection between posgres and striim server is established, we will link the target data warehouse, in this case Google Bigquery. Striim also offers schema conversion feature where table schema can be validated for both source and target that
helps in migration of source schema to the target database.
Step 4: Targeting Google Bigquery
You have to make sure the instance of Bigquery mirrors the tables in the source database.This can be done from Google Cloud Console interface.Under the project inside Google Bigquery, create the dataset and an emty table containing all the columns that is populated
with data migrated from postgres database.
Follow the steps below to create a new dataset in Bigquery and integrating with Striim app using a service account:
Create a dataset with tables mirroring the source schema.
Go back to app wizard and enter the service key of your BigQuery instance to connect the app with target data warehouse
Now Striim server is integrated with both postgres and Bigquery and we are ready to configure Striim app for data migration.
Step 5: Configure Striim app using UI
With source, target and Striim server integrated for data migration, a few configuration on the easy to understand app UI is made before deploying and running the app.
Click on source and add the connection url (tunnel address), user name and password in proper format:
Click on target and add the input stream, source and target tables and upload the service key for Bigquery instance
Now the app is good to go for deployment and data migration.
Step 6: Deploy and Run the Striim app for Fast Data Migration
In this section you will deploy and run the final app to visualize the power of Change Data Capture in Striim’s next generation technology.
Setting up the Postgres to BigQuery Streaming App
Step 1: Follow the recipe to configure your jump server and SSh tunnel to Striim cloud.
Step 2: Set up your Postgres Source and BigQuery Target.
Step 3: Create Postgres to BQ Striim app using wizard as shown in the recipe
Step 4: Migrate your data from source to Target by deploying your striim app
Wrapping Up: Start Your Free Trial
Our tutorial showed you how easy it is to migrate data from PostgreSQL to Google BigQuery, a leading cloud data warehouse. By constantly moving your data into BigQuery, you could now start building analytics or machine
learning models on top, all with
minimal impact to your current systems. You could also start ingesting and normalizing more datasets with Striim to fully take advantage of your data when combined with the power of BigQuery.
As always, feel free to reach out to our integration experts to schedule a demo, or try Striim for free here.
Tools you need
Striim
Striim’s unified data integration and streaming platform connects clouds, data and applications.
PostgreSQL
PostgreSQL is an open-source relational database management system.
Google BigQuery
BigQuery is a serverless, highly scalable multicloud data warehouse.
Google Compute Engine
Google Compute Engine offers a scalable number of Virtual Machines. In this use case a VM instance will serve as jump host for SSH tunneling
Real-Time Hotspot Detection For Transportation with Striim and BigQuery
Detect and visualize cab booking hotspots using Striim and BigQuery
Benefits
Analyze Real-Time Operational DataStriim facilitates lightning speed data transfer that enables tracking of real-time booking data for strategic decisionsDetect Hotspots in Real TimeWith Striim’s live dashboard millions of data can be processed to capture booking updates at different geographical locationsPerform Real-time Analytics with Fast data replicationWith CDC technology live data from multiple sources can be replicated to the cloud for access across multiple teams for analytics
On this page
Overview
Transportation services like Uber and Lyft require streaming data with real-time processing for actionable insights on a minute-by-minute basis. While batch data can provide powerful insight on medium or long-term trends, in this age live data analytics is an essential component of enterprise decision making. It is said data loses its importance within 30 minutes of generation. To facilitate better performance for companies that hugely depend on live data, Striim offers continuous data ingestion from multiple data sources in real-time. With Striim’s powerful log-based Change Data Capture platform, database transactions can be captured and processed in real-time along with data migration to multiple clouds. This technology can be used by all e-commerce, food-delivery platforms, transportation services, and many others that harnesses real-time analytics to generate value. In this blog, I have shown how real-time cab booking data can be streamed to Striim’s platform and processed in-flight for real-time visualization through Striim’s dashboard and simultaneous data migration to BigQuery at the same time.
Video Walkthrough
Core Striim Components
File Reader: Reads files from disk using a compatible parser.
Stream: A stream passes one component’s output to one or more other components. For example, a simple flow that only writes to a file might have this sequence
Continuous Query : Striim Continuous queries are are continually running SQL queries that act on real-time data and may be used to filter, aggregate, join, enrich, and transform events.
Window: A window bounds real-time data by time, event count or both. A window is required for an application to aggregate or perform calculations on data, populate the dashboard, or send alerts when conditions deviate from normal parameters.
WAction and WActionStore: A WActionStore stores event data from one or more sources based on criteria defined in one or more queries. These events may be related using common key fields.
BigQueryWriter: Striim’s BigQueryWriter writes the data from various supported sources into Google’s BigQuery data warehouse to support real time data warehousing and reporting.
Dashboard: A Striim dashboard gives you a visual representation of data read and written by a Striim application
Using Striim for CDC
Change Data Capture has gained popularity in the last decade as companies have realized the power of real-time data analysis from OLTP databases. In this example, data is acquired from a CSV file and streamed with a window of 30 minutes. Due to memory limitation, the window was kept for 30 minutes for better visualization. Ideally, Striim can handle a window of even 1 second when the amount of data is huge. The data was also processed inside Striim and the change was captured and migrated to BigQuery data warehouse.
The dataset used in this example contains 4.5 million uber booking data that contains 5 features, DateTime, latitude, longitude, and TLC base company at NYC. The goal is to stream the data through Striim’s CDC platform and detect the areas that have more bookings (hotspots) every 30 minutes. The latitude and longitude value was converted using the following query to cluster them into certain areas
The following steps were followed before deploying the app
Step 1: Reading Data from CSV and Streaming into Continuous Query through stream
In this step, the data is read from the data source using a delim-separated parser (cabCsvSource) with FileReader adapter used for CSV source. The data is then streamed into cabCSvSourceStream which reaches cabCQ for continuous query. The SQL query at CabCQ2 converts the incoming data into required format
Step 2: Sending the data for processing in Striim as well as into BigQuery
The data returned from the continuous query is then sent for processing through a 30-minute window and also migrated to BigQuery for storage. This is a unique feature of the Striim platform that allows data migration and processing at the same time. The data transferred to Bigquery can be used by various teams for analytics while Striim’s processing gives valuable insights through its dashboard.
The connection between Striim and BigQuery is set up through a service account that allows Striim application to access BigQuery. A table structure is created within BigQuery instance that replicates the schema of incoming stream of data.
Step 3: Aggregating Data using Continuous Query
After the data is captured, a query is applied on each window that clusters the latitudes and longitudes of pickup locations into discrete areas and returns the aggregate demand of each area in those 30 minutes. There is a slight difference between the clustering query that goes into BigQuery(AggregateCQ) and the one that goes into StriimWaction(LatLonCQ). The data in StriimWaction is used for dashboard tracking hotspots, so the first latitude and longitude value is taken as the estimate of area. The data that goes into BigQuery returns all latitude longitude values and could be used for further analytics.
Striim’s Dashboard for Real Time Analytics
The dashboard ingested data from two different streams. The window30CabData provided data to the bar chart that tracked the number of vehicles from each company every 30 mins and the vector map fetched data from LatLon2dashboardWaction that had the aggregated count of bookings for every area in a 30-minute window. As seen from the dashboard snippet below, the dashboard was configured to return red for high demand (>30), yellow for medium demand (between 15-30), and green for low demand(less than 15). This can be very useful to companies for real-time tracking analytics and data migration. The dashboard has two components, a query where data is fetched from the app and processed as required. The other component is configuration, where specifications on visualization are entered. The SQL queries below show the query for the vector map and bar chart. The two snippets from the dashboard show an instant of booking at different locations and the number of vehicles from each TCL company.
Migrating Data to BigQuery
Finally, the aggregated data along with the source data was migrated to BigQuery. Migration to BigQuery followed the same process of creating a table schema that mirrored the structure of incoming data from Striim.
Flowchart
Below is the UI of cabBookingApp that tracks bookings within a given time window and returns aggregated demand in a Striim dashboard. The app also streams data from source to BigQuery .
Here is an overview of each component from the flowchart:
Setting Up the Tracking Application
Step 1: Download the data and Sample TQL file from our github repo
You can download the TQL files for streaming app and lag monitor app from our github repository. Deploy the Striim app on your Striim server.
Step 2: Configure your CSV source and BigQuery target and add it to the source and target components of the app
You can find the csv dataset in our github repo. Set up your BigQuery dataset and table that will act as a target for the streaming application
Step 3: Follow the recipe to create Striim Dashboard for real-time analytics
The recipe gives a detail on how to set up a Striim dashboard for this use case
Step 4: Run your app and dashboard
Deploy and run your app and dashboard for real-time tracking and analytics
Why Striim?
Striim is a single platform for real-time data ingestion, stream processing, pipeline monitoring, and real-time delivery with validation. It uses low-impact Change Data Capture technology to migrate a wide variety of high-volume, high-velocity data from enterprise databases in real-time. Using Striim’s in-flight data processing and real-time dashboard, companies can generate maximum value from streaming data. Enterprises dealing with astronomical data can incorporate Striim to maximize profit with real-time strategic decisions. Striim is used by companies like Google, Macy’s, and Gartner for real-time data migration and analytics. In this data-driven age, generate maximum profit for your company using Striim’s CDC-powered platform.
Real-Time Point-of-Sale (POS) Analytics with Striim and BigQuery
How to use Striim to generate transaction reports and send alerts when transaction counts are higher or lower than average
Benefits
Detect Anomalies in Real Time
Generate reports on transaction activity and get alerts when transaction counts deviate from the meanManage Inventory Levels
Monitor your stock levels in real time and get notified when stock is lowGet a Live View of Your Sales
Use POS Analytics to get real-time insights into customer purchases
On this page
Analyze point-of-sale retail data in real-time with Striim, BigQuery, and the BI tool of your choice.
Streaming Change Data Capture – Striim
Streaming Data Pipelines in SQL – Striim
Streaming Data Visualiztion – Striim
Cloud Data Warehouse with incremental views – BigQuery
Reporting & BI – Metabase
Overview
Before following the instructions below, sign up for Striim Developer to run through this tutorial (no credit card required).
In the web UI, from the top menu, select Apps > View All Apps.
If you don’t see PosApp anywhere on the page (you may need to expand the Samples group) , download the TQL from Striim’s GitHub page. select Create App > Import TQL file, navigate to Striim/Samples/PosApp, double-click PosApp.tql, enter Samples as the namespace, and click Import.
At the bottom right corner of the PosApp tile, select … > Manage Flow. The Flow Designer displays a graphical representation of the application flow:
The following is simplified diagram of that flow:
Step 1: Acquire Data
Striim has hundreds of connectors including change data capture from databases such as Oracle, SQLServer, MySQL, and PostgreSQL.
In this example, we’ll use a simple file source. We call this a ‘Source’ component in Striim.
Double-clicking CsvDataSource opens it for editing:
This is the primary data source for this application. In a real-world application, it would be real-time data. Here, the data comes from a comma-delimited file, posdata.csv. Here are the first two lines of that file:
BUSINESS NAME,MERCHANT ID,PRIMARY ACCOUNT NUMBER,POS DATA CODE,DATETIME,EXP DATE,CURRENCY CODE,AUTH AMOUNT,TERMINAL ID,ZIP,CITY,COMPANY
1,D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu,6705362103919221351,0,20130312173210,0916,USD,2.20,5150279519809946,41363,Quicksand
In Striim terms, each line of the file is an event, which in many ways is comparable to a row in a SQL database table, and and can be used in similar ways. Click Show Advanced Settings to see the DSVParser properties:
The True setting for the header property indicates that the first line contains field labels that are not to be treated as data.
The “Output to” stream CsvStream automatically inherits the WAEvent type associated with the CSVReader:
The only field used by this application is “data”, an array containing the delimited fields.
Step 2: Filter The Data Stream
CsvDataSource outputs the data to CsvStream, which is the input for the query CsvToPosData:
This CQ converts the comma-delimited fields from the source into typed fields in a stream that can be consumed by other Striim components. Here, “data” refers to the array mentioned above, and the number in brackets specifies a field from the array, counting from zero. Thus data[1] is MERCHANT ID,
data[4] is DATETIME, data[7] is AUTH AMOUNT, and data[9] is ZIP.
TO_STRING, TO_DATEF, and TO_DOUBLE functions cast the fields as the types to be used in the Output to stream. The DATETIME field from the source is converted to both a dateTime value, used as the event
timestamp by the application, and (via the DHOURS function) an integer hourValue, which is used to look up historical hourly averages from the HourlyAveLookup cache, discussed below.
The other six fields are discarded. Thus the first line of data from posdata.csv has at this point been code-reduced to five values:
D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu (merchantId)
20130312173210 (DateTime)
17 (hourValue)
2.20 (amount)
41363 (zip)
The CsvToPosDemo query outputs the processed data to PosDataStream:
PosDataStream assigns the five remaining fields the names and data types in the order listed above:
PRIMARY ACCOUNT NUMBER to merchantID
DATETIME to dateTime
the DATETIME substring to hourValue
AUTH AMOUNT to amount
ZIP to zip
Step 3: Define the Data Set
PosDataStream passes the data to the window PosData5Minutes:
A window is in many ways comparable to a table in a SQL database, just as the events it contains are comparable to rows in a table. The Mode and Size settings determine how many events the window will contain and how it will be refreshed. With the Mode set to Jumping, this window is refreshed with a
completely new set of events every five minutes. For example, if the first five-minute set of events received when the application runs from 1:00 pm through 1:05 pm, then the next set of events will run from 1:06 through 1:10, and so on. (If the Mode were set to Sliding, the window continuously add new
events and drop old ones so at to always contain the events of the most recent five minutes.)
Step 4: Process and Enhance the Data
The PosData5Minutes window sends each five-minute set of data to the GenerateMerchantTxRateOnly query. As you can see from the following schema diagram, this query is fairly complex:
The GenerateMerchantTxRateOnly query combines data from the PosData5Minutes event stream with data from the HourlyAveLookup cache. A cache is similar to a source, except that the data is static rather than real-time. In the real world, this data would come from a periodically updated table in the
payment processor’s system containing historical averages of the number of transactions processed for each merchant for each hour of each day of the week (168 values per merchant). In this sample application, the source is a file, hourlyData.txt, which to simplify the sample data set has only 24 values
per merchant, one for each hour in the day.
For each five-minute set of events received from the PosData5Minutes window, the GenerateMerchantTxRateOnly query ouputs one event for each merchantID found in the set to MerchantTxRateOnlyStream, which applies the MerchantTxRate type. The easiest way to summarize what is happening in the above diagram
is to describe where each of the fields in the MerchantTxRateOnlySteam comes from:
field
description
TQL
merchantId
the merchantID field from PosData5Minutes
SELECT p.merchantID
zip
the zip field from PosData5Minutes
SELECT … p.zip
startTime
the dateTime field for the first event for the merchantID in the five-minute set from PosData5Minutes
SELECT … FIRST(p.dateTime)
count
count of events for the merchantID in the five-minute set from PosData5Minutes
SELECT … COUNT(p.merchantID)
totalAmount
sum of amount field values for the merchantID in the five-minute set from PosData5Minutes
SELECT … SUM(p.amount)
hourlyAve
the hourlyAve value for the current hour from HourlyAveLookup, divided by 12 to give the five-minute average
SELECT … l.hourlyAve/12 …
WHERE … p.hourValue = l.hourValue
upperLimit
the hourlyAve value for the current hour from HourlyAveLookup, divided by 12, then multiplied by 1.15 if the value is 200 or less, 1.2 if the value is between 201 and 800, 1.25 if the value is between 801 and 10,000, or 1.5 if the value is over 10,000
SELECT …l.hourlyAve/12 * CASE
WHEN l.hourlyAve/12 >10000 THEN 1.15
WHEN l.hourlyAve/12 > 800 THEN 1.2
WHEN l.hourlyAve/12 > 200 THEN 1.25
ELSE 1.5 END
lowerLimit
the hourlyAve value for the current hour from HourlyAveLookup, divided by 12, then divided by 1.15 if the value is 200 or less, 1.2 if the value is between 201 and 800, 1.25 if the value is between 801 and 10,000, or 1.5 if the value is over 10,000
SELECT …l.hourlyAve/12 / CASE
WHEN l.hourlyAve/12 >10000 THEN 1.15
WHEN l.hourlyAve/12 > 800 THEN 1.2
WHEN l.hourlyAve/12 > 200 THEN 1.25
ELSE 1.5 END
category, status
placeholders for values to be added
SELECT … '<NOTSET>'
The MerchantTxRateOnlyStream passes this output to the GenerateMerchantTxRateWithStatus query, which populates the category and status fields by evaluating the count, upperLimit, and lowerLimit fields:
SELECT merchantId,
zip,
startTime,
count,
totalAmount,
hourlyAve,
upperLimit,
lowerLimit,
CASE
WHEN count > 10000 THEN 'HOT'
WHEN count > 800 THEN 'WARM'
WHEN count > 200 THEN 'COOL'
ELSE 'COLD' END,
CASE
WHEN count > upperLimit THEN 'TOOHIGH'
WHEN count < lowerLimit THEN 'TOOLOW'
ELSE 'OK' END
FROM MerchantTxRateOnlyStream
The category values are used by the Dashboard to color-code the map points. The status values are used by the GenerateAlerts query.
The output from the GenerateMerchantTxRateWithStatus query goes to MerchantTxRateWithStatusStream.
Step 5: Populate the Dashbhoard
The GenerateWactionContent query enhances the data from MerchantTxRateWithStatusStream with the merchant’s company, city, state, and zip code, and the latitude and longitude to position the merchant on the map, then populates the MerchantActivity WActionstore:
In a real-world application, the data for the NameLookup cache would come from a periodically updated table in the payment processor’s system, but the data for the ZipLookup cache might come from a file such as the one used in this sample application.
When the application finishes processing all the test data, the WActionStore will contain 423 WActions, one for each merchant. Each WAction includes the merchant’s context information (MerchantId, StartTime, CompanyName, Category, Status, Count, HourlyAve, UpperLimit, LowerLimit, Zip, City, State,
LatVal, and LongVal) and all events for that merchant from the MerchantTxRateWithStatusStream (merchantId, zip, String, startTime, count, totalAmount, hourlyAvet, upperLimit, lowerLimit, category, and status for each of 40 five-minute blocks). This data is used to populate the dashboard, as detailed in PosAppDash.
Step 6: Trigger Alerts
MerchantTxRateWithStatusStream sends the detailed event data to the GenerateAlerts query, which triggers alerts based on the Status value:
When a merchant’s status changes to TOOLOW or TOOHIGH, Striim will send an alert such as, “WARNING – alert from Striim – POSUnusualActivity – 2013-12-20 13:55:14 – Merchant Urban Outfitters Inc. count of 12012 is below lower limit of 13304.347826086958.” The “raise” value for the flag field instructs the subscription not to send another alert until the status returns to OK.
When the status returns to OK, Striim will send an alert such as, “INFO – alert from Striim – POSUnusualActivity – 2013-12-20 14:02:27 – Merchant Urban Outfitters Inc. count of 15853 is back between 13304.347826086958 and 17595.0.” The “cancel” value for the flag field instructs the subscription to send an alert the next time the status changes to TOOLOW or TOOHIGH. See Sending alerts from applications for more information on info, warning, raise, and cancel.
Step 7: Stream data to BigQuery
Now you can stream the enriched point-of-sale analytical data to cloud data platforms such as BigQuery, Snowflake, and Redshift. In this example, we’re going to stream the data to BigQuery for storage and analysis.
Striim can stream data into BigQuery in real-time while optimizing costs with partition pruning on merge operations. After the data is loaded into BigQuery, your team can analyze it with your business intelligence tool of choice. In this example we’re generating the reports in Metabase.
Get Started
Try this recipe yourself for free by signing up for a trial or talking to sales team for hands-on guidance. Striim can be deployed on your laptop, in a docker container, or directly on your cloud-service provider of
choice.
Tools you need
Striim
Striim’s unified data integration and streaming platform connects clouds, data and applications.
Google BigQuery
BigQuery is a serverless, highly scalable multicloud data warehouse.
BI Tool (Metabase)
Metabase is an open source business intelligence tool.
Streaming Data Integration: Using CDC to Stream Database Changes
How to use the PostgreSQL CDC (PostgreSQL Reader) with a Striim Target
Benefits
Get a Live ViewUse Striim CDC to stream data for a continuous view of your transactional dataEmpower Your TeamsGive teams across your organization a real-time view into your database transactionsReact in Real TimeReact to business events as they happen; not minutes or hours later.
On this page
Overview
This is the first in a two-part blog post discussing how to use Striim for streaming database changes to Apache Kafka. Striim offers continuous data ingestion from databases and other sources in real time; transformation and enrichment using Streaming SQL; delivery of data to multiple targets in the cloud or on-premise; and visualization of results. In this part, we will use Striim’s low-impact, real-time change data capture (CDC)
feature to stream database changes (inserts, updates, and deletes) from an operational database into Striim.
What is Change Data Capture
Databases maintain change logs that record all changes made to the database contents and metadata. These change logs can be used for database recovery in the event of a crash, and also for replication or integration.
With Striim’s log-based CDC, new database transactions – including inserts, updates, and deletes – are read from source databases’ change logs and turned into a stream of events without impacting the database workload. Striim
offers CDC for Oracle, SQL Server, HPE NonStop, MySQL, PostgreSQL, MongoDB,
and MariaDB.
Why use Striim’s CDC?
Businesses use Striim’s CDC capabilities to feed real-time data to their big data lakes, cloud databases, and enterprise messaging systems, such as Kafka, for timely operational decision making. They also migrate from on-premises databases to cloud environments
without downtime and keep cloud-based analytics environments up-to-date with on-premises databases using CDC.
How to use Striim’s CDC?
Striim’s easy-to-use CDC template wizards automate the creation of applications that leverage change data capture, to stream events as they are created, from various source
systems to various targets. Apps created with templates may be modified using Flow Designer or by exporting TQL, editing it, and importing the modified TQL. Striim has templates for many source-target combinations.
In addition, Striim offers pre-built integration applications for bulk loading and CDC from PostgreSQL source databases to target systems including PostgreSQL database, Kafka, and files. You can start these applications
in seconds by going to the Applications section of the Striim platform.
Striim pre-built sample integration applications.
In this post, we will show how to use the PostgreSQL CDC (PostgreSQL Reader) with a Striim Target using the wizards for a custom application instead of using the pre-built application mentioned above. The instructions below assume that you are using the PostgreSQL instance that comes with the Striim
platform. If you are using your own PostgreSQL database instance, please review our instructions on how to set up PostgreSQL for CDC.
Step 1: Using the CDC Template
To start building the CDC application, in the Striim web UI, go to the Apps page and select Add App > Start with Template. Enter PostgreSQL in the search field to narrow down the sources and select “PostgreSQL Reader to Striim”.
Wizard template selection when creating a new app.
Next enter the name and namespace for your application (the namespace is a way of grouping applications together).
Step 2: Specifying the Data Source Properties
In the SETUP POSTGRESQL READER specify the data source and table properties:
the connection URL, username, and password.
the tables for which you want to read change data. Configuring the data source in the wizard.
After you complete this step, your application will open in the Flow Designer.
The wizard generates a data flow.
In the flow designer, you can add various processors, enrichers, transformers, and targets as shown below to complete your pipeline, in some cases with zero coding.
Flow designer enrichers and processors.
Flow designer event transformers and targets.
In the next blog post, we will discuss how to add a Kafka target to this data pipeline. In the meantime, please feel free to request
a demo with one of our lead technologists, tailored to your environment.
Tools you need
Striim
Striim’s unified data integration and streaming platform connects clouds, data and applications.
PostgreSQL
PostgreSQL is an open-source relational database management system.
Migrating from MySQL to Google Cloud SQL with Change Data Capture
How to Change Data Capture (CDC) to synchronize data from MySQL into a Google Cloud SQL instance
Benefits
Simplify Cloud MigrationsSay goodbye to downtime and complex migrations. Striim seamlessly loads and syncs your changing data.Add New Cloud ApplicationsAdd new, client-facing applications by synchronizing an existing on-premises application’s data set.Sync Current and New DatabasesKeep data in your current MySQL instance in sync with your new CloudSQL deployment until your migration goes live
On this page
Overview
Migrating from MySQL to Google Cloud SQL opens up cloud services that offer a wealth of capabilities with low management overhead and cost. But, moving your existing on-premises applications to the cloud can be a challenge. Existing applications built on top of on-premises deployments of databases like MySQL. In this blog post we are going to use a database technology called Change Data Capture to synchronize data from MySQL into a Google Cloud SQL instance.
Introduction
One of the major hurdles when migrating applications, whether you’re changing the technology or moving to the cloud, is migrating your data. The older and bigger the application, the more difficult that migration becomes. Traditional Extract, Translate, and Load (ETL) tools require multiple passes and, potentially, significant downtime to handle data migration activities. This is where real-time ETL tools like Striim shine.
There are a number of benefits in migrating applications this way, such as being able to:
Add a new, client-facing cloud application by synchronizing an existing, traditionally on-premises application’s data set.
Migrate one or more on-premises application (with data) to the cloud for production testing with almost zero impact on the existing application.
Let’s walk through an example of connecting an on-premises instance of MySQL to Google Cloud SQL for MySQL.
Step 1: Set Up the MySQL Database
Before we dive into Striim, we are assuming you have an on-premises MySQL instance already configured and containing relevant data. For the purpose of this post, the dataset we have loaded data from a GitHub source (https://github.com/datacharmer/test_db) in a local MySQL instance. The data set is pretty large, which is perfect for our purposes, and contains a dummy set of employee information, including salaries.
Rather than importing all the data this data set contains, I’ve excluded the load_salaries2.dump and load_salaries3.dump files. This will allow us to insert a lot of data after Striim has been configured to show how powerful Change Data Capture is.
Step 2: Set Up the Striim Application
Now that we have an on-premises data set in MySQL, let’s set up a new Striim application on Google Cloud Platform to act as the migration service.
Open your Google Cloud console and open or start a new project. Go to the marketplace and search for Striim.
A number of options should return, but the one we’re after is the first item, which allows integration of real-time data to GCP.
Select this option and start the deployment process by pressing the deploy button at the bottom of this screen. For this tutorial, we’ll use the basic defaults for a Striim server. In production, however, you’d need to size appropriately depending on your load.
Step 3: Create a Target Database
While we wait for the Striim server to deploy, let’s create a Google SQL database to which we’ll migrate our database. Select the SQL option from the side menu in Google Cloud and create a new MySQL instance.
Once again, we’ll use the defaults for a basic Google MySQL instance. Open the instance and copy the instance connection name for use later. Then open the database instance and take note of the IP address.
We also need to create the database structure for the data we imported into the local MySQL instance. To do this, open the Google Cloud shell, log into the MySQL server, and run the SQL to create the table structure. Striim also needs a checkpoint table to keep the state in the event of failures, so create that table structure using the following:
Open the Google Console and go back to the Deployment Manager, and click “Visit site”. It’s important to note that the Striim VM currently has a dynamic external IP address. In a production environment, you’ll want to set this to static so it won’t change.
When you first visit the site, you’ll see a congratulations screen. Click accept and fill in the basic details. Leave the license field blank for the trial version of Striim, or add your license key if you have one.
The first thing we need to do is create an application that performs the initial load of our current data set. There is no wizard for setting up an initial load application that we require, so go to Apps and create an app from scratch.
First, let’s add a MySQL reader from the sources tab on the left. This will access our local database to load the initial set of data. To read from a local server we need to use a JDBC style URL using the template: jdbc:mysql://:/
We are also mapping the tables we want to sync by specifying them in the tables folder using
.
This allows us to restrict what is synchronized. Finally, under output to, specify a new WAEvent type for this connector.
Once we have our source wired up, we need to add a target to the flow so our data starts to transfer. Using a process similar to the one we used previously, add the GoogleCloudWriter target with the Google cloud instance in the connection URL. For the tables, this time we need to match the source and targets together using the form: .,.
Once both the source and target connectors have been configured, deploy and start the application to begin the initial load process.
After the application goes through the starting process we can click on the monitor button to show the performance of the application. This will take a little while to complete, depending on your database size
Step 5: Change Data Capture
While the initial load takes place, let’s create the Change Data Capture (CDC) application to get ready for the synchronization process.
This time we are going to use a wizard to create the application. Click on Add Apps, then select the option to start with a Template. Striim comes with a lot of templates for different use cases out of the box. Scroll down to Streaming Integration for MySQL, click “show more,” then look for MySQL CDC to Cloud SQL MySQL. This option sets up a CDC application for MySQL to Google Cloud SQL.
Fill out the connection information for your on-premises application and click next. This should connect to the agent and ensure everything is correct.
Once everything is connected, check the tables you selected in the first application. These will synchronize any changes that occur.
Now we need to link our source to our target. Specify the connection details for your Google SQL instance using the IP address from the previous step. Fill in the username, password, and list of tables from the source database and click next. When you’ve finished the wizard, the application should be ready to go.
If the previous data load application has finished, stop the data load application and start the Change Data Capture application. Once the application has started, start loading transactions into your on-premises database. This should start synchronizing the data that changes up to your Google Cloud instance.
Open the Change Data Capture application and select monitor. You should see both the input and output figures as the application keeps track of your on-premises database. The activity chart should be showing the throughput of the records synchronizing from one location to another.
If you open the database console in Google Cloud and run a “SELECT COUNT(salary) FROM salaries” statement a couple of times, you should see the count figure rising.
Step 6: Adding More Load
While the servers are synchronizing, let’s go back to our local MySQL and add some other transactions. Import the remaining two salaries files, load_salaries2.dump and load_salaries3.dump. This will provide additional transactions to be synchronized and you’ll see Striim continue to add transactions as they happen without needing to do anything else.
Next Steps
We looked at a really quick and easy way to synchronize an on-premises instance of MySQL to Google Cloud SQL using Striim. At this point, you could start using the cloud database to run additional applications or do data analysis — without affecting the performance and use of your existing system.
If you open the menu on the Striim admin page, then open the apps section, and finally open this application, you’ll also see other steps you could add to this flow that support even more complex use cases, such as adding in transforms, combining multiple sources, or even splitting across targets.
To learn more about migrating from MySQL to Google Cloud SQL, check out the product page. To see how Striim can help with your move to cloud-based services, schedule a demo with a Striim technologist, or download a free trial of the platform.
Tools you need
Striim
Striim’s unified data integration and streaming platform connects clouds, data and applications.
MySQL
MySQL is an open-source relational database management system.
Google Cloud SQL
Google Cloud SQL is a fully managed relational database service for MySQL, PostgreSQL and SQL Server.
Migrate and Replicate Data from SQL Server to Snowflake with Striim
How to use Striim to migrate schemas and data from an existing SQL Server database into Snowflake
Benefits
Operational AnalyticsAnalyze your data in real-time without impacting the performance of your operational database.Control Your CostsMove data to Snowflake incrementally while controlling upload and merge intervals to optimize compute costsGet a Live ViewUse Striim CDC to stream data to Snowflake for a continuous view of your SQLServer transactions.
On this page
What is Striim?
Striim is a next generation Cloud Data Integration product that offers change data capture (CDC) enabling continuous replication from popular databases such as Oracle, SQLServer, PostgreSQL and many others.
In addition to CDC connectors, Striim has hundreds of automated adapters for file-based data (logs, xml, csv), IoT data (OPCUA, MQTT), and applications such as Salesforce and SAP. Our SQL-based stream processing engine makes it easy to enrich and normalize data before it’s written to Snowflake.
Step 1: Prepare a Snowflake Database and Launch Striim
Before migrating your data from SQL Server, you must first create a database within Snowflake to store the migrated data. After that database has been created you can launch Striim as a Snowflake partner service directly from within Snowflake.
Follow the steps below to prepare a database and launch Striim in Snowflake:
Launch Snowflake in a web browser.
Click on Databases > Create:
Enter a unique name for the database and click Finish:
Click on Partner Connect in the top right corner of the navigation bar.
Locate and click on Striim in the list of Snowflake partners. Note: you may need to first switch your user role to ACCOUNTADMIN in order to launch Striim from Snowflake:
Activate the partner account if the account has not been previously activated:
Confirm that the database you created in steps 2 and 3 above is listed in Database(s) with USAGE privilege granted and click Connect:
Note: On subsequent launches after activation has been completed for the first time, Snowflake will just prompt you to launch:
Step 2: Create a Striim Service to Host a Data Migration App
In Striim an app will be used to migrate the data. Before you can create that app, you need to first create and configure a service to host the app.
Follow the steps below to create a new Striim service:
Click on Marketplace in the top menu.
Locate the Snowflake app and click on Create:
Enter a unique name in the Name field noting the naming requirements listed:
(Optional) Click Show advanced options and specify the Service Version and Cluster Type.
Click Create. The browser will redirect to the Services screen.
Wait for the new service to enter the Running state.
Click on Launch:
The service will open in a new browser tab.
Step 3: Create a Data Migration App on the Striim Service
With the service now created and launched, you must create an app that runs on that service to perform the data migration.
Follow the steps below to create a new data migration app:
Click on Apps to display the app management screen:
Click Create app:
Click on SQL Server Database to Snowflake:
Enter a name for the new application and the namespace and click Save:
The data migration wizard is displayed:
Step 4: Prepare for Data Migration to Snowflake
In this section you will configure your app to access your source SQL Server database. As you proceed through Striim’s migration wizard, Striim will validate that it can access and fetch the metadata and data of your source SQL Server database.
Follow the steps below to migrate data using Striim’s step-by-step wizard:
Enter the details of your existing SQL Server database from which data is to be migrated and click Next:
Striim will verify that it can connect to your database and obtain metadata:
Click Next to advance to the Select Schemas screen.
Select the schemas to migrate from your SQL Server database to Snowflake and click Next:
Striim will fetch and validate metadata for each table in your database:
Click Next to advance to the Select Tables screen. Navigate through each schema on the left-hand side, and select the tables from each to migrate:
Click Next to complete the wizard. The target creation screen is displayed:
Step 5: Prepare Your Target and Migrate Your Data to Snowflake
Now that Striim can read from your source SQL Server database, you must configure Striim to write to your target Snowflake database.
Follow the steps below to prepare a Snowflake target and start the migration process:
Enter a unique name for the target in the Target Name field on the Create Snowflake Target(s) screen.
Ensure Input From is set to the stream you created using the steps in the previous sections. Note that the name will be in the form of+ _OutputStream.
Prepare the URL of the target Snowflake database: copy the following URL into the Connection URL field and replace YOUR_HOST with the base host domain assigned by Snowflake to your account, and YOUR_DATABASE with the name of your database:
Enter your credentials corresponding to your Snowflake account into the Username and Password fields.
(Optional) Modify which tables to migrate by configuring the table name(s) listed in the Tables field. By default, the tables listed will be based on those specified in the steps from the previous section and include the % as a wildcard character:
Click Next. Striim will recreate the schema(s) in your Snowflake database:
Click Next after target creation is complete. Striim will begin migrating your data to Snowflake and will provide a detailed Application Progress popup showing how the migration is progressing:
Wrapping Up: Start Your Free Trial
Our tutorial showed you how easy it is to migrate data from SQLServer to Snowflake, a leading cloud data warehouse. Once your data has been migrated, Striim enables continuous, real-time updates via Change Data Capture.
For instances where changes continue to be made to the data in your source database, Striim enables zero-downtime, zero-data loss migrations to Snowflake.
As always, feel free to reach out to our integration experts to schedule a demo, or try Striim for free here.
Tools you need
Striim
Striim’s unified data integration and streaming platform connects clouds, data and applications.
PostgreSQL
PostgreSQL is an open-source relational database management system.
Snowflake
Snowflake is a cloud-native relational data warehouse that offers flexible and scalable architecture for storage, compute and cloud services.
Streaming Data Integration Tutorial: Adding a Kafka Stream to a Real-Time Data Pipeline
Connect your streaming pipelines to Apache Kafka seamlessly for maximum organizational adoption of real-time data
Benefits
Turn Your Database into a StreamUse non-intrusive CDC to Kafka to create persistent streams that can be accessed by multiple consumers.Empower Your TeamsGive teams across your organization a real-time view of your Oracle database transactionsGet Analytics-Ready DataGet your data ready for analytics before it lands in the cloud. Streaming SQL scales in memory to keep your data moving.
On this page
Overview
This is the second post in a two-part blog series discussing how to stream database changes into Kafka. You can read part one here. We will discuss adding a Kafka target to the CDC
source from the previous post. The application will ingest database changes (inserts, updates, and deletes) from the PostgreSQL source tables and deliver to Kafka to continuously to update a Kafka topic.
What is Kafka?
Apache Kafka is a popular distributed, fault-tolerant, high-performance messaging system.
Why use Striim with Kafka?
The Striim platform enables you to ingest data into Kafka, process it for different consumers, analyze, visualize, and distribute to a broad range of systems on-premises and in the cloud with an intuitive UI and SQL-based language for easy and fast development.
Step 1: How to add a Kafka Target to a Striim Dataflow
From the Striim Apps page, click on the app that we created in the previous blog post and select Manage Flow.
MyPostgreSQL-CDC App
This will open your application in the Flow Designer.
MyPostgrSQLCDC app data flow.
To do the writing to Kafka, we need to add a Target component into the dataflow. Click on the data stream, then on the plus (+) button, and select “Connect next Target component” from the menu.
Connecting a target component to the data flow.
Step 2: Enter the Target Info
The next step is to specify how to write data to the target. With the New Target ADAPTER drop-down, select Kafka Writer Version 0.11.0, and enter a few connection properties including the target name, topic and broker URL.
Configuring the Kafka target.
Step 3: Data Formatting
Different Kafka consumers may have different requirements for the data format. When writing to Kafka in Striim, you can choose the data format with the FORMATTER drop down and optional configuration properties. Striim supports JSON, Delimited, XML, Avro and free text formats, in this case we are selecting the JSONFormatter.
Configuring the Kafka target FORMATTER
Step 4: Deploying and Starting the Data Flow
The resulting data flow can now be modified, deployed, and started through the UI. In order to run the application, it first needs to be deployed, click on the ‘Created’ dropdown and select ‘Deploy App’ to show the Deploy UI.
Deploying the app
The application can be deployed to all nodes, any one node, or predefined groups in a Striim cluster, the default is the least used node.
Deployment node selection
After deployment the application is ready to start, by selecting Start App.
Starting the app
Step 5: Testing the Data Flow
You can use the PostgreSQL to Kafka sample integration application, to insert, delete, and update the PosgtreSQL CDC source table, then you should see data flowing in the UI, indicated by a number of msgs/s. (Note the message sending happens fast and quickly returns to 0).
Testing the streaming data flow
If you now click on the data stream in the middle and click on the eye icon, you can preview the data flowing between PostgreSQL and Kafka. Here you can see the data, metadata (these are all updates) and before values (what the data was before the update).
Previewing the data flowing from PostgreSQL to Kafka