So I was working on this as part of another project, and I thought I would share. Basically, one of the most annoying aspects of building data pipelines is getting test data to verify the results of that data.
So nothing overly ground breaking, but I thought this might be useful for anyone trying to pipe data into a data pipeline, whether that be blob storage or event hub.
So what I did was build a small generic utility to build text files full of JSON objects and then parse those files putting them onto event hub.
Now for the sake of this instance, I decoupled the code for the event hub, so that I could get more utility, and implemented this as part of a dotnet core console application. Below is the method for generating the files:
static void Main(string[] args)
{
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
var configuration = builder.Build();
var appSettings = new AppSettings();
ConfigurationBinder.Bind(configuration.GetSection("AppSettings"), appSettings);
for (var f = 0; f < appSettings.NumberOfFiles; f++)
{
var fileName = $"{appSettings.FilePrefix}-{f}-{ DateTime.Now.ToString("MM-dd-yyyy-hh-mm-ss")}.txt";
Console.WriteLine("-----------------------------------------------------------------------");
Console.WriteLine($"Creating file - {fileName}");
Console.WriteLine("-----------------------------------------------------------------------");
Console.WriteLine("");
//Create records for entry
var list = new List<LogEntryModel>();
for (var x = 0; x < appSettings.MaxNumberOfRecords; x++)
{
var logEntry = new LogEntryModel();
logEntry.LogDateTime = DateTime.Now;
logEntry.LogMessage = $"Test { x } - { DateTime.Now.ToString("MM-dd-yyyy-hh-mm-ss")}";
logEntry.SequenceNumber = x;
list.Add(logEntry);
Console.WriteLine($"Creating line entry - { logEntry.LogMessage}");
var randomTime = RandomNumber(1, appSettings.MaxWaitBetweenEntries);
Console.WriteLine($"Thread sleep for { randomTime }");
Thread.Sleep(randomTime);
Console.WriteLine($"Sleep over - Processing file");
}
var filePath = $@"C:\temp\{fileName}";
//Create text file"
using (StreamWriter file = File.CreateText(filePath))
{
JsonSerializer serializer = new JsonSerializer();
serializer.Serialize(file, list);
Console.WriteLine("Pushing Json to file");
Console.WriteLine("");
}
//Push to blob storage
BlobServiceClient blobServiceClient = new BlobServiceClient(appSettings.BlobConnectionString);
//Create a unique name for the container
string containerName = "logs";
// Create the container and return a container client object
var containerClient = blobServiceClient.GetBlobContainerClient(containerName);
BlobClient blobClient = containerClient.GetBlobClient(fileName);
Console.WriteLine("Pushing File to Blob Storage");
Console.WriteLine("");
using FileStream uploadFile = File.OpenRead(filePath);
var uploadTask = blobClient.UploadAsync(uploadFile, true);
uploadTask.Wait();
uploadFile.Close();
Console.WriteLine("File Uploaded to Blob storage");
Console.WriteLine("");
var randomFileTime = RandomNumber(1, appSettings.MaxWaitBetweenFiles);
Console.WriteLine($"Thread going to sleep for - { randomFileTime}");
Thread.Sleep(randomFileTime);
Console.WriteLine("Thread sleep down, moving onto next file");
Console.WriteLine("");
Console.WriteLine($"Started Deleting file {filePath}");
File.Delete(filePath);
Console.WriteLine($"Finished Deleting file {filePath}");
}
Console.WriteLine("All Files Processed and uploaded.");
Console.ReadLine();
}
In addition to creating staggered entries, it additionally outputs in an easy readable format to the console screen. Below is the method I use to generate the random numbers:
static int RandomNumber(int min, int max)
{
return _random.Next(min, max);
}
Overall nothing to special, but it at least creates an easy method of generating the json objects required for pumping through a data pipeline.
Below is all I leverage for a data model for this but this could easily be swapped for any data model you like with some random elements:
public class LogEntryModel
{
public DateTime LogDateTime { get; set; }
public string LogMessage { get; set; }
public int SequenceNumber { get; set; }
}
Now on the back end, I needed to take these blob files and parse them. And did so by doing the following:
using (var sr = new StreamReader(logFile, Encoding.UTF8))
{
var logs = new List<LogEntryModel>();
var str = sr.ReadToEnd();
logs = JsonConvert.DeserializeObject<List<LogEntryModel>>(str);
await using (var producerClient = new EventHubProducerClient(connectionString, hubName))
{
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
foreach (var logEntry in logs)
{
var txt = JsonConvert.SerializeObject(logEntry);
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(txt)));
}
await producerClient.SendAsync(eventBatch);
log.LogInformation($"Log of {name} with {logs.Count} rows processed.");
}
}
Anyway, I hope you find this helpful to get data pushed into your pipeline.
Top comments (0)