My Event Hub Upload Task Failed With Task Canceled
Azure outcome hub is a information streaming platform and ingesting outcome service. Effect hub tin receive streaming data from one or multiple source (event Producers) and it can be saved and processed by one or multiple consumers. Result data tin be captured/saved by the Azure Blob storage or Azure Data Lake Storage for long time processing purposes. It can be integrated with Azure Stream Analytics and Azure Functions from the event hub.
Some of the important terms which y'all demand to sympathize before starting Azure Event Hub implementation include:
Outcome producer
Producers are responsible to generate the effect data and ship to the event hub. 1 or multiple producers tin send data to the effect hub.
Upshot receivers/ consumer
Consumers are responsible to mind to the event data from the provided event hub. One or multiple consumers can read 1 division or all of the event data from the event hub.
Consumer grouping
Event consumers can be grouped based on business logic. Consumer clients need the information of the group in order to read event data from the event hub.
Segmentation
As data are sent as streaming data to the event hub, event hub uses partitions then that consumers tin can read one partition of the event information based on partition id or tin can read all event information. Partition is directly related to the number of consumers who are going to heed the event information.
Sectionalization Fundamental
Used to generate the hash key of the effect data.
More details tin be found in Azure documentation at the Microsoft official site.
Where to apply:
- Application monitoring.
- Abnormal detection
- Streaming data captured and analysis.
Some real-world applications which can be used with Azure Event Hub
Reading sensor data
There is an Automatic Traffic Organisation has been installed in the city. The application records the traffic data in existent time with its camaras and sensors. In a normal situation an automated traffic system works every bit per schedule but if there is some abnormal situation like a traffic jam or natural disasters information technology should be able to handle this by some automatic mechanism to redirect road traffic to other routes. Also, the system should be able to alert the traffic police to handle the situation. Every bit sensors generate huge data from different sources from all over the metropolis (multiple event sources) at the same fourth dimension (may be ten million or more per sec) a normal system volition exist not able to able to capture all the events.
Hither Azure Upshot Hub can ingest all streaming data coming from all the traffic signal centers (city squares) analyze it based on which conclusion should be taken by Automatic Traffic System. So, in this case, the real time data will be captured past sensors from the automated traffic system and those information are send to the Azure event hub for data storage and analysis based on which system can make a decision.
Ecommerce application to captures user data for boosting ecommerce business
Ecommerce applications capture user data similar user id, mobile no, device identification, location, items which user have searched & user preference, based on which product promotion can be set on user notification area and likewise tin be displayed to his social sites advertisement area. User can access eCommerce applications from different device like mobile, desktop, laptop and tab etc. It must exist able to capture all user data in real fourth dimension. Over again, Azure Event Hub can capture all event data and can salve and pass the data to the other systems for analysis based on which product promotion can be sent to private user's devices.
Nowadays, applications are not only supposed to be able to execute business organisation logic properly (like purchase requests) but they also must be able to lead the business to the next level and provide information based on which applications can predict & propose items which a user is going to buy in the future from ecommerce platforms.
Hither is what nosotros are going to larn:
Azure event hub implementation using .cyberspace core console application.
We are going to take ecommerce applications in this sit-in.
How to create Effect Hub in Azure
In order to create issue, first outcome hub namespace is needed.
- Login to the Azure portal and navigate to the Azure Event Hub.
- Now click on add button to create namespace.
- The rest of the configuration can be done with default setting. Now review and create namespace.
Once Event Hub namespace is created, now we can proceed with result hub creation. Navigate to the newly create Effect Hub namespace and click on namespace.
Click on Consequence Hub and provide the mandatory fields. As we are using basic tier option in consequence hub namespace, some options will be in disabled mode.
Capture choice cannot be configured as it is non bachelor in basic tier plan. Division count is an of import property and it is directly related to the event consumers. In this demo nosotros are implementing two consumers to listen the event hub.
After creating the namespace and the result hub we need to create SAS(Shared access policies) to access the event hub instance from our console awarding.
In that location are 3 claims in shared access policy. 1 policy can have all three options or may be a combination of any two. Simply per the application needs, I suggest you create policy based on responsibilities.
SAS claims
- Manage
- Send
- Listen
For this demo, two policies were created, one for producers with send claim and one for consumers with listen claim. Please note that policies tin can be defined in Event Hub Instance level and Event namespace level as well. In this demo, policies are defined in event namespace level.
In this example, I will be covering the event producer and effect consumers merely. I accept used .cyberspace cadre 3.1 with Visual Studio 2019. Information technology is always a good practice to implement code with the latest version.
We are going to implement an ecommerce application which collects user data from user devices and sends information technology to an event hub. User data contains user info, device info and the items which the user wants to buy in the nearly future, and he is searching for that item in ecommerce applications. We have created two consumers who are going to mind to the effect hub and consume the event data. Consequence data can be further passed to any analysis application to generate the business organisation lead.
There are 3 parts in the console application.
Entry indicate or main method
Code from Program.cs,
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- namespace AzureEventHubMutliProducerConsumer {
- grade Program {
- static void Principal( string [] args) {
- Program plan =new Plan();
- Task[] tasks =new Task[3];
- tasks[0] = Chore.Run(() => {
- Thread.Sleep(1000);
- program.RunProducer();
- });
- tasks[ane] = Task.Run(() => {
- Thread.Sleep(1000);
- plan.RunEventHubConsumerReadEvent();
- });
- tasks[2] = Task.Run(() => {
- Thread.Sleep(1000);
- program.RunEventHubConsumerReadEventPartitionEvent();
- });
- Task.WaitAll(tasks);
- Console.WriteLine("Press any whatever to end program" );
- Panel.ReadKey();
- }
- public void RunProducer() {
- EventProducer eventProducer =new EventProducer();
- eventProducer.Init();
- eventProducer.CreatePurchaseRequest().Wait();
- }
- public void RunEventHubConsumerReadEvent() {
- EventHubConsumerClientDemo eventHubConsumer =new EventHubConsumerClientDemo();
- eventHubConsumer.ConsumerReadEvent("$Default" ).Wait();
- }
- public void RunEventHubConsumerReadEventPartitionEvent() {
- EventHubConsumerClientDemo eventHubConsumer =new EventHubConsumerClientDemo();
- eventHubConsumer.ConsumerReadEventPartitionEvent("$Default" , "ane" ).Wait();
- }
- }
- }
3 threads will be created by the console programme in order to run upshot producer and two consumers can run in parallel.
Event Producer
Code for EventProducer.cs
- using System;
- using Arrangement.Collections.Generic;
- using System.Text;
- using Arrangement.Threading.Tasks;
- using Azure.Messaging.EventHubs;
- using Azure.Messaging.EventHubs.Producer;
- namespace AzureEventHubMutliProducerConsumer {
- public class EventProducer {
- cord connectionString = "—- Go key from azure queue ->Shared access key—" ;
- cord eventHubName = "—-Consequence hub proper name--" ;
- EventDataBatch generateData;
- List <string > device = new List < cord > ();
- EventHubProducerClient producerClient;
- public void Init() {
- producerClient =new EventHubProducerClient(connectionString, eventHubName);
- device.Add("Mobile" );
- device.Add("Laptop" );
- device.Add("Desktop" );
- device.Add together("Tablet" );
- }
- public async Task GenerateEvent() {
- try {
- int partitionId = 0;
- foreach (var eachDevice in device) {
- StringBuilder strBuilder =new StringBuilder();
- var batchOptions =new CreateBatchOptions() {
- PartitionId = partitionId.ToString()
- };
- generateData = producerClient.CreateBatchAsync(batchOptions).Upshot;
- strBuilder.AppendFormat("Search triggered for iPhone 21 from decive {0} " , eachDevice);
- var eveData =new EventData(Encoding.UTF8.GetBytes(strBuilder.ToString()));
- eveData.Properties.Add("UserId" , "UserId" );
- eveData.Properties.Add("Location" , "North Bharat" );
- eveData.Properties.Add("DeviceType" , eachDevice);
- generateData.TryAdd(eveData);
- producerClient.SendAsync(generateData).Wait();
- partitionId++;
- if (partitionId > 1) partitionId = 0;
- }
- look Task.CompletedTask;
- }catch (Exception exp) {
- Panel.WriteLine("Fault occruied {0}. Endeavour again later" , exp.Message);
- }
- }
- }
- }
Init() method will initialize EventHubProducer customer with consequence connection fundamental and event hub proper name. It will besides create and add 4 devices to the device list. In the real world, at that place might be the millions of the devices and millions of users using ecommerce at the same fourth dimension. We are trying to replicate the same scenario by using the device list to generate events.
GenerateEvent() method volition generate result data for each device with some user data. We are using the segmentation id 0 or 1 which volition be used by the consumer to read consequence data based on partitioning id. Effect information can be sent in a single event data or a batch of event data, here we are sending batch with division id.
Event Consumer
Code for EventConsumer.cs
- using Azure.Messaging.EventHubs.Consumer;
- using System;
- using System.Text;
- using Organisation.Threading;
- using System.Threading.Tasks;
- namespace AzureEventHubMutliProducerConsumer {
- public course EventHubConsumerClientDemo {
- string connectionString = "—- Become key from azure queue ->Shared access cardinal—" ;
- string eventHubName = "—-Event hub name--" ;
- public async Task ConsumerReadEvent( cord consumerGroup) {
- try {
- CancellationTokenSource cancellationSource =new CancellationTokenSource();
- cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
- EventHubConsumerClient eventConsumer =new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
- awaitforeach (PartitionEvent partitionEvent in eventConsumer.ReadEventsAsync(cancellationSource.Token)) {
- Panel.WriteLine("---Execution from ConsumerReadEvent method---" );
- Console.WriteLine("------" );
- Panel.WriteLine("Outcome Data recieved {0} " , Encoding.UTF8.GetString(partitionEvent.Information.Body.ToArray()));
- if (partitionEvent.Data != nada ) {
- Console.WriteLine("Event Data {0} " , Encoding.UTF8.GetString(partitionEvent.Data.Torso.ToArray()));
- if (partitionEvent.Data.Backdrop != zippo ) {
- foreach (var keyValue in partitionEvent.Data.Properties) {
- Panel.WriteLine("Effect data key = {0}, Effect data value = {one}" , keyValue.Key, keyValue.Value);
- }
- }
- }
- }
- Console.WriteLine("ConsumerReadEvent end" );
- await Chore.CompletedTask;
- }take hold of (Exception exp) {
- Console.WriteLine("Error occruied {0}. Try over again later" , exp.Bulletin);
- }
- }
- public async Task ConsumerReadEventPartitionEvent( cord consumerGroup, string partitionId) {
- endeavour {
- CancellationTokenSource cancellationSource =new CancellationTokenSource();
- cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
- EventHubConsumerClient eventConsumer =new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
- ReadEventOptions readEventOptions =new ReadEventOptions() {
- MaximumWaitTime = TimeSpan.FromSeconds(30)
- };
- awaitforeach (PartitionEvent partitionEvent in eventConsumer.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest, readEventOptions, cancellationSource.Token)) {
- Console.WriteLine("---Execution from ConsumerReadEventPartitionEvent method---" );
- Console.WriteLine("------" );
- if (partitionEvent.Information != null ) {
- Console.WriteLine("Outcome Information recieved {0} " , Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray()));
- if (partitionEvent.Information.Properties != null ) {
- foreach (var keyValue in partitionEvent.Data.Backdrop) {
- Console.WriteLine("Event data key = {0}, Event data value = {1}" , keyValue.Fundamental, keyValue.Value);
- }
- }
- }
- }
- wait Task.CompletedTask;
- }catch (Exception exp) {
- Console.WriteLine("Mistake occruied {0}. Endeavor over again subsequently" , exp.Bulletin);
- }
- }
- }
- }
Event consumer with ReadEventsAsync method
ConsumerReadEvent() method takes consumer group name as input params and connects to the consequence hub with connection key and event hub proper name. At that place is a CancellationTokenSource used so that after waiting from provided time period for result data, it should be terminated automatically. Too, ReadEventOptions provides time period. In lodge to read event from even hub, EventHub- ConsumerClient has been used. The method ReadEventsAsync method will read all the event information from event hub.
- look foreach (PartitionEvent partitionEvent in eventConsumer.ReadEventsAsync(cancellationSource.Token))
The rest of the method is used to process the event data and its properties and display in console. Afterwards consuming all effect data from the effect hub, consumer will wait for 30 seconds every bit provided in cancellation token and subsequently that the awarding volition throw an exceptions and the application will be terminated.
Event consumer with ReadEventsFromPartitionAsync method
Information technology is like to the ConsumerReadEvent method, just the main difference is the input params and method which is going to read event information by partition id. While defining the partition in the Azure Event Hub namespace in the Azure portal nosotros have set only ii partitions, and so only 0 or one can be the partition id. Based on the partitioning id provided in the method, data volition exist retrieved from event hub. The rest of the partition will exist not consumed past the consumer.
- look foreach (PartitionEvent partitionEvent in eventConsumer.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest, readEventOptions, cancellationSource.Token))
EventPosition tin be set to latest, early on and some other option too.
In summary, we are running to two consumer clients in parallel. 1 will read all event information and the other consumer will just read effect data based on partition id.
How to run applications with this code
Option -1
Prerequisite- Visual Studio 2019 and .internet cadre iii.1. Update your "Shared access central" in producer and consumer grade
- Open visual studio and click on new -> project.
- Select .net core from left end and select the Console App(.net cadre) template.
- Mention the path where solution volition be created and click 1 ok button.
New console app project volition exist created with default Programme.cs file.
At present from Nuget package manager, search Azure.Messaging.EventHubs.Processor and install for the selected projection.
- Add together new .cs file to the projection and name it as EventProducer.cs
- Re-create the content from section "Lawmaking for EventProducer.cs" and paste in as EventProducer.cs file
- Add together new .cs file to the project and proper noun information technology as EventHubConsumerClientDemo.cs
- Re-create the content from department "Code for EventHubConsumerClientDemo.cs" and paste in as EventHubConsumerClientDemo.cs file
- Copy the content of primary entry indicate (only c# code) and paste it in the Program.cs file.
Build the solution and run the awarding.
Selection -two
Download the cipher file from hither and unzip in your local system. Open up the solution file, build the project, and run the application.
Thank you, and happy coding 😊😊😊😊
Source: https://www.c-sharpcorner.com/article/azure-event-hub-implementation-using-net-core-console-app/
0 Response to "My Event Hub Upload Task Failed With Task Canceled"
Post a Comment