My Event Hub Upload Task Failed With Task Canceled

Azure outcome hub is a information streaming platform and ingesting outcome service. Effect hub tin receive streaming data from one or multiple source (event Producers) and it can be saved and processed by one or multiple consumers. Result data tin be captured/saved by the Azure Blob storage or Azure Data Lake Storage for long time processing purposes. It can be integrated with Azure Stream Analytics and Azure Functions from the event hub.

Some of the important terms which y'all demand to sympathize before starting Azure Event Hub implementation include:

Outcome producer

Producers are responsible to generate the effect data and ship to the event hub. 1 or multiple producers tin send data to the effect hub.

Upshot receivers/ consumer

Consumers are responsible to mind to the event data from the provided event hub. One or multiple consumers can read 1 division or all of the event data from the event hub.

Consumer grouping

Event consumers can be grouped based on business logic. Consumer clients need the information of the group in order to read event data from the event hub.

Segmentation

As data are sent as  streaming data to the event hub, event hub uses partitions then that consumers tin can read one partition of the event information based on partition id or tin can  read all event information. Partition is directly related to the number of consumers who are going to heed the event information.

Sectionalization Fundamental

Used to generate the hash key of the effect data.

More details tin be found in Azure documentation at the  Microsoft official site.

Where to apply:

  • Application monitoring.
  • Abnormal detection
  • Streaming data captured and analysis.

Some real-world applications which can be used with Azure Event Hub

Reading sensor data

There is an Automatic Traffic Organisation has been installed in the city.  The application records the traffic data in existent time with its camaras and sensors. In a normal situation an automated traffic system works every bit per schedule but if there is some abnormal situation like a traffic jam or natural disasters information technology should be able to handle this by some automatic mechanism to redirect road traffic to other routes. Also, the system should be able to alert the traffic police to handle the situation. Every bit sensors generate huge data from different sources from all over the metropolis (multiple event sources) at the same fourth dimension (may be ten million or more per sec) a normal system volition exist not able to able to capture all the events.

Hither Azure Upshot Hub can ingest all streaming data coming from all the traffic signal centers (city squares) analyze it based on which conclusion should be taken by Automatic Traffic System. So, in this case, the real time data will be captured past sensors from the automated traffic system and those information are send to the Azure event hub for data storage and analysis based on which system can make a decision.

Ecommerce application to captures user data for boosting ecommerce business

Ecommerce applications capture user data similar user id, mobile no, device identification, location, items which user have searched & user preference, based on which product promotion can be set on user notification area and likewise tin be displayed to his social sites advertisement area. User can access eCommerce applications from different device like mobile, desktop, laptop and tab etc. It must exist able to capture all user data in real fourth dimension. Over again, Azure Event Hub can capture all event data and can salve and pass the data to the other systems for analysis based on which product promotion can be sent to private user's devices.

Nowadays, applications are  not only supposed to be able to execute business organisation logic properly (like purchase requests) but they also must be able to lead the business to the next level and provide information based on which applications can predict & propose items which a user is going to buy in the future from ecommerce platforms.

Hither is what nosotros are going to larn:

Azure event hub implementation using .cyberspace core console application.

We are going to take ecommerce applications in this sit-in.

Azure Event Hub Implementation Using .Net Core Console App

How to create Effect Hub in Azure

In order to create issue, first outcome hub namespace is needed.

  • Login to the Azure portal and navigate to the Azure Event Hub.
  • Now click on add button to create namespace.

    Azure Event Hub Implementation Using .Net Core Console App

  • The rest of the configuration can be done with default setting. Now review and create namespace.

    Azure Event Hub Implementation Using .Net Core Console App

Once Event Hub namespace is created, now we can proceed with result hub creation. Navigate to the newly create Effect Hub namespace and click on namespace.

Click on Consequence Hub and provide the mandatory fields. As we are using basic tier option in consequence hub namespace, some options will be in disabled mode.

Azure Event Hub Implementation Using .Net Core Console App

Capture choice cannot be configured as it is non bachelor in basic tier plan. Division count is an of import property and it is directly related to the event consumers. In this demo nosotros are implementing two consumers to listen the event hub.

After creating the namespace and the result hub we need to create SAS(Shared access policies) to access the event hub instance from our console awarding.

In that location are 3 claims in shared access policy. 1 policy can have all three options or may be a combination of any two. Simply per the application needs, I suggest you create policy based on responsibilities.

SAS claims

  • Manage
  • Send
  • Listen

For this demo, two policies were created, one for producers with send claim and one for consumers with listen claim. Please note that policies tin can be defined in Event Hub Instance level and Event namespace level as well. In this demo, policies are defined in event namespace level.

Azure Event Hub Implementation Using .Net Core Console App

In this example, I will be covering the event producer and effect consumers merely. I accept used .cyberspace cadre 3.1 with Visual Studio 2019. Information technology is always a good practice to implement code with the  latest version.

We are going to implement an ecommerce application which collects user data from user devices and sends information technology to an event hub. User data contains user info, device info and the items which the user wants to buy in the nearly future, and he is searching for that item in ecommerce applications. We have created two consumers who are going to mind to the effect hub and consume the event data. Consequence data can be further passed to any analysis application to generate the business organisation lead.

There are 3 parts in the console application.

Entry indicate or main method

Code from Program.cs,

  1. using  System;
  2. using  System.Threading;
  3. using  System.Threading.Tasks;
  4. namespace  AzureEventHubMutliProducerConsumer {
  5. grade  Program {
  6. static void  Principal( string [] args) {
  7.             Program plan =new  Plan();
  8.             Task[] tasks =new  Task[3];
  9.             tasks[0] = Chore.Run(() => {
  10.                 Thread.Sleep(1000);
  11.                 program.RunProducer();
  12.             });
  13.             tasks[ane] = Task.Run(() => {
  14.                 Thread.Sleep(1000);
  15.                 plan.RunEventHubConsumerReadEvent();
  16.             });
  17.             tasks[2] = Task.Run(() => {
  18.                 Thread.Sleep(1000);
  19.                 program.RunEventHubConsumerReadEventPartitionEvent();
  20.             });
  21.             Task.WaitAll(tasks);
  22.             Console.WriteLine("Press any whatever to end program" );
  23.             Panel.ReadKey();
  24.         }
  25. public void  RunProducer() {
  26.             EventProducer eventProducer =new  EventProducer();
  27.             eventProducer.Init();
  28.             eventProducer.CreatePurchaseRequest().Wait();
  29.         }
  30. public void  RunEventHubConsumerReadEvent() {
  31.             EventHubConsumerClientDemo eventHubConsumer =new  EventHubConsumerClientDemo();
  32.             eventHubConsumer.ConsumerReadEvent("$Default" ).Wait();
  33.         }
  34. public void  RunEventHubConsumerReadEventPartitionEvent() {
  35.             EventHubConsumerClientDemo eventHubConsumer =new  EventHubConsumerClientDemo();
  36.             eventHubConsumer.ConsumerReadEventPartitionEvent("$Default" , "ane" ).Wait();
  37.         }
  38.     }
  39. }

3 threads will be created by the console programme in order to run upshot producer and two consumers can run in parallel.

Event Producer

Code for EventProducer.cs

  1. using  System;
  2. using  Arrangement.Collections.Generic;
  3. using  System.Text;
  4. using  Arrangement.Threading.Tasks;
  5. using  Azure.Messaging.EventHubs;
  6. using  Azure.Messaging.EventHubs.Producer;
  7. namespace  AzureEventHubMutliProducerConsumer {
  8. public class  EventProducer {
  9. cord  connectionString = "—- Go key from azure queue ->Shared access key—" ;
  10. cord  eventHubName = "—-Consequence hub proper name--" ;
  11.         EventDataBatch generateData;
  12.         List <string  > device = new  List < cord  > ();
  13.         EventHubProducerClient producerClient;
  14. public void  Init() {
  15.             producerClient =new  EventHubProducerClient(connectionString, eventHubName);
  16.             device.Add("Mobile" );
  17.             device.Add("Laptop" );
  18.             device.Add("Desktop" );
  19.             device.Add together("Tablet" );
  20.         }
  21. public  async Task GenerateEvent() {
  22. try  {
  23. int  partitionId = 0;
  24. foreach (var eachDevice in  device) {
  25.                     StringBuilder strBuilder =new  StringBuilder();
  26.                     var batchOptions =new  CreateBatchOptions() {
  27.                         PartitionId = partitionId.ToString()
  28.                     };
  29.                     generateData = producerClient.CreateBatchAsync(batchOptions).Upshot;
  30.                     strBuilder.AppendFormat("Search triggered for iPhone 21 from decive {0} " , eachDevice);
  31.                     var eveData =new  EventData(Encoding.UTF8.GetBytes(strBuilder.ToString()));
  32.                     eveData.Properties.Add("UserId" , "UserId" );
  33.                     eveData.Properties.Add("Location" , "North Bharat" );
  34.                     eveData.Properties.Add("DeviceType" , eachDevice);
  35.                     generateData.TryAdd(eveData);
  36.                     producerClient.SendAsync(generateData).Wait();
  37.                     partitionId++;
  38. if  (partitionId > 1) partitionId = 0;
  39.                 }
  40.                 look Task.CompletedTask;
  41.             }catch  (Exception exp) {
  42.                 Panel.WriteLine("Fault occruied {0}. Endeavour again later" , exp.Message);
  43.             }
  44.         }
  45.     }
  46. }

Init() method will initialize EventHubProducer customer with consequence connection fundamental and event hub proper name. It will besides create and add 4 devices to the device list. In the real world, at that place might be the millions of the devices and millions of users using ecommerce at the same fourth dimension. We are trying to replicate the same scenario by using the device list to generate events.

GenerateEvent() method volition generate result data for each device with some user data. We are using the segmentation id 0 or 1 which volition be used by the consumer to read consequence data based on partitioning id. Effect information can be sent in a single event data or a batch of event data, here we are sending batch with division id.

Event Consumer

Code for EventConsumer.cs

  1. using  Azure.Messaging.EventHubs.Consumer;
  2. using  System;
  3. using  System.Text;
  4. using  Organisation.Threading;
  5. using  System.Threading.Tasks;
  6. namespace  AzureEventHubMutliProducerConsumer {
  7. public course  EventHubConsumerClientDemo {
  8. string  connectionString = "—- Become key from azure queue ->Shared access cardinal—" ;
  9. string  eventHubName = "—-Event hub name--" ;
  10. public  async Task ConsumerReadEvent( cord  consumerGroup) {
  11. try  {
  12.                 CancellationTokenSource cancellationSource =new  CancellationTokenSource();
  13.                 cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
  14.                 EventHubConsumerClient eventConsumer =new  EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
  15.                 awaitforeach (PartitionEvent partitionEvent in  eventConsumer.ReadEventsAsync(cancellationSource.Token)) {
  16.                     Panel.WriteLine("---Execution from ConsumerReadEvent method---" );
  17.                     Console.WriteLine("------" );
  18.                     Panel.WriteLine("Outcome Data recieved {0} " , Encoding.UTF8.GetString(partitionEvent.Information.Body.ToArray()));
  19. if  (partitionEvent.Data != nada ) {
  20.                         Console.WriteLine("Event Data {0} " , Encoding.UTF8.GetString(partitionEvent.Data.Torso.ToArray()));
  21. if  (partitionEvent.Data.Backdrop != zippo ) {
  22. foreach (var keyValue in  partitionEvent.Data.Properties) {
  23.                                 Panel.WriteLine("Effect data key = {0}, Effect data value = {one}" , keyValue.Key, keyValue.Value);
  24.                             }
  25.                         }
  26.                     }
  27.                 }
  28.                 Console.WriteLine("ConsumerReadEvent end" );
  29.                 await Chore.CompletedTask;
  30.             }take hold of  (Exception exp) {
  31.                 Console.WriteLine("Error occruied {0}. Try over again later" , exp.Bulletin);
  32.             }
  33.         }
  34. public  async Task ConsumerReadEventPartitionEvent( cord  consumerGroup, string  partitionId) {
  35. endeavour  {
  36.                 CancellationTokenSource cancellationSource =new  CancellationTokenSource();
  37.                 cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
  38.                 EventHubConsumerClient eventConsumer =new  EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
  39.                 ReadEventOptions readEventOptions =new  ReadEventOptions() {
  40.                     MaximumWaitTime = TimeSpan.FromSeconds(30)
  41.                 };
  42.                 awaitforeach (PartitionEvent partitionEvent in  eventConsumer.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest, readEventOptions, cancellationSource.Token)) {
  43.                     Console.WriteLine("---Execution from ConsumerReadEventPartitionEvent method---" );
  44.                     Console.WriteLine("------" );
  45. if  (partitionEvent.Information != null ) {
  46.                         Console.WriteLine("Outcome Information recieved {0} " , Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray()));
  47. if  (partitionEvent.Information.Properties != null ) {
  48. foreach (var keyValue in  partitionEvent.Data.Backdrop) {
  49.                                 Console.WriteLine("Event data key = {0}, Event data value = {1}" , keyValue.Fundamental, keyValue.Value);
  50.                             }
  51.                         }
  52.                     }
  53.                 }
  54.                 wait Task.CompletedTask;
  55.             }catch  (Exception exp) {
  56.                 Console.WriteLine("Mistake occruied {0}. Endeavor over again subsequently" , exp.Bulletin);
  57.             }
  58.         }
  59.     }
  60. }

Event consumer with ReadEventsAsync method

ConsumerReadEvent() method takes consumer group name as  input params and connects to the consequence hub with connection key and event hub proper name. At that place is a CancellationTokenSource used so that after waiting from provided time period for result data, it should be terminated automatically. Too, ReadEventOptions provides time period. In lodge to read event from even hub, EventHub- ConsumerClient has been used. The method ReadEventsAsync method will read all the event information from event hub.

  1. look foreach (PartitionEvent partitionEvent in  eventConsumer.ReadEventsAsync(cancellationSource.Token))

The rest of the method is used to process the event data and its properties and display in console. Afterwards consuming all effect data from the effect hub, consumer will wait for 30 seconds every bit provided in cancellation token and subsequently that the awarding volition throw an exceptions and the application will be terminated.

Event consumer with ReadEventsFromPartitionAsync method

Information technology is like to the ConsumerReadEvent method, just the main difference is the input params and method which is going to read event information by partition id. While defining the partition in the Azure Event Hub namespace in the Azure portal nosotros have set only ii partitions, and so only 0 or one can be the partition id. Based on the partitioning id provided in the method, data volition exist retrieved from event hub. The rest of the partition will exist not consumed past the consumer.

  1. look foreach (PartitionEvent partitionEvent in  eventConsumer.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest, readEventOptions, cancellationSource.Token))

EventPosition tin be set to latest, early on and some other option too.

In summary, we are running to two consumer clients in parallel. 1 will read all event information and the other consumer will just read effect data based on partition id.

How to run applications with this code

Option -1

Prerequisite- Visual Studio 2019 and .internet cadre iii.1. Update your "Shared access central" in producer and consumer grade

  • Open visual studio and click on new -> project.
  • Select .net core from left end and select the Console App(.net cadre) template.
  • Mention the path where solution volition be created and click 1 ok button.

    Azure Event Hub Implementation Using .Net Core Console App

    New console app project volition exist created with default Programme.cs file.

    At present from Nuget package manager, search Azure.Messaging.EventHubs.Processor and install for the selected projection.

    Azure Event Hub Implementation Using .Net Core Console App

  • Add together new .cs file to the projection and name it as EventProducer.cs
  • Re-create the content from section "Lawmaking for EventProducer.cs" and paste in as EventProducer.cs file
  • Add together new .cs file to the project and proper noun information technology as EventHubConsumerClientDemo.cs
  • Re-create the content from department "Code for EventHubConsumerClientDemo.cs" and paste in as EventHubConsumerClientDemo.cs file
  • Copy the content of primary entry indicate (only c# code) and paste it in the Program.cs file.

Build the solution and run the awarding.

Selection -two

Download the cipher file from hither and unzip in your local system. Open up the solution file, build the project, and run the application.

Thank you, and happy coding 😊😊😊😊

reevesturapter50.blogspot.com

Source: https://www.c-sharpcorner.com/article/azure-event-hub-implementation-using-net-core-console-app/

0 Response to "My Event Hub Upload Task Failed With Task Canceled"

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel