Bu yazıda Apache Kafka ve .Net Core kullanarak nasıl mail gönderebileceğimize bakacağız. İyi okumalar. 💫 🌌 ✨
Öncelikle docker kullanarak kafka ve zookeeper kurulumunu yapacağız.
Ardından bir producer ve consumer oluşturmak için kafka nuget paketini kullanarak basit bir konsol uygulaması oluşturacağız.
Kafka için öncelikle zookeeper'ı ayarlamamız gerekiyor çünkü zookeeper producer ve consumer'ları kontrol edecek.
Docker kullanarak zookeeper'ı ayarlamak için aşağıdaki komutu terminalinizde çalıştırın:
docker pull confluentinc/cp-zookeeper
Zookeeper'ı ayarladıktan sonra terminalinizde aşağıdaki komutu çalıştırarak kafka image'ını indirin.
docker pull confluentinc/cp-kafka
Şimdi ise hem zookeeper'ı hemde kafka konteynırları için kullanacağımız bir ağ oluşturmamız gerekiyor. Bunun için aşağıdaki komutu terminalinizde çalıştırın.
docker network create kafka
Artık bir zookeeper konteynır'ı oluşturabiliriz. Bunun için aşağıdaki komutu terminalinizde çalıştırın.
docker run -d --network=kafka --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper
(network alanına yeni oluşturduğumuz ağ ismini veriyoruz yani kafka, port olarak standart bağlantı noktası olan 2181 veriyoruz)
Aynı ağı tekrar kullanacak olan Kafka konteynırını oluşturmak için aşağıdaki komutu çalıştırın.
docker run -d --network=kafka --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 confluentinc/cp-kafka
iki image'ı da indirdikten sonra artık producer ve consumer'ın birbiriyle iletişim kurabilmesi için gerekli olan topic'i üretebiliriz.
Aşağıdaki komutu çalıştırarak "demo" adında bir topic oluşturun.
docker exec kafka kafka-topics --create --bootstrap-server localhost:9092 --topic demo
Zookeeper ve Kafkayı çalıştırdığımıza göre artık yeni bir konsol uygulaması oluşturarak Confluent Kafka nuget paketini yükleyebiliriz.
<PackageReference Include="Confluent.Kafka" Version="1.8.2" />
Bu proje için gerekli olan diğer paketleri de aşağıdaki gibi ekleyebilirsiniz.
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" />
<PackageReference Include="Nancy" Version="2.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="kafka-sharp" Version="1.4.3" />
Mail gönderiminde kullanmak için IMessageBase adında boş bir interface oluşturuyoruz.
public interface IMessageBase
{
}
Daha sonra EmailMessage adında bir class oluşturup bu interfaceden türetiyoruz.
public class EmailMessage : IMessageBase
{
public string To { get; set; }
public string Subject { get; set; }
public string Content { get; set; }
}
Şimdi yapacağımız ilk şey mesajı yayınlacak olan yapımcıyı yani producer'ı oluşturmak
Producer
public class KafkaProducerHostedService : IHostedService
{
private readonly ILogger<KafkaProducerHostedService> _logger;
private IProducer<Null, string> _producer;
public KafkaProducerHostedService(ILogger<KafkaProducerHostedService> logger)
{
_logger = logger;
var config = new ProducerConfig()
{
BootstrapServers = "localhost:9092"
};
//Producer iki tür paremetre alır bunlardan birincisi kendi türünden Null olur, ikincisi bir dize olur ve buradaki config'i alır
_producer = new ProducerBuilder<Null, string>(config).Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
//burada messageList adında bir liste oluşturuyoruz ve içerisine göndermek istediğimiz mailin değerlerini veriyoruz.
List<EmailMessage> messageList = new List<EmailMessage>();
for (int i = 1; i < 11; i++)
{
var m = new EmailMessage
{
Content = $"content {i} message",
Subject = $"subject {i} message",
To = "alicimail@gmail.com"
};
messageList.Add(m);
}
//daha sonra oluşturduğumuz bu listeyi gezerek producer ile "demo" adındaki topic'imize mesajları yazıyoruz.
foreach (var message in messageList)
{
var value = JsonConvert.SerializeObject(message);
_logger.LogInformation(value);
await _producer.ProduceAsync(topic: "demo", new Message<Null, string>()
{
Value = value
}, cancellationToken);
_producer.Flush(timeout: TimeSpan.FromSeconds(10));
_producer.Poll(TimeSpan.FromMinutes(1));
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
//zaman uyumsuzluğu durumunda producer'ı elden çıkarır :)
_producer?.Dispose();
return Task.CompletedTask;
}
}
Mail göndermek için SMTP kullanacağız bunun için Mail adında bir class oluşturuyoruz ve SendMail adında bir method ekliyoruz.
public static class Mail
{
public static void SendMail(EmailMessage message)
{
MailMessage m = new MailMessage("göndericimail@gmail.com", message.To);
m.Subject = message.Subject;
m.Body = message.Content;
SmtpClient client = new SmtpClient("smtp.gmail.com", 587);
client.EnableSsl = true;
client.Credentials = new System.Net.NetworkCredential("göndericimail@gmail.com", "sifre");
try
{
client.Send(m);
Console.WriteLine("Mail gönderildi");
}
catch (SmtpException ex)
{
Console.WriteLine("Exception caught in SendErrorLog: {0}",
ex.ToString());
}
}
}
Consumer
Consumer için kafka-sharp nuget paketini kullanıyoruz.
public class KafkaConsumerHostedService : IHostedService
{
private readonly ILogger<KafkaConsumerHostedService> _logger;
private ClusterClient _cluster;
public KafkaConsumerHostedService(ILogger<KafkaConsumerHostedService> logger)
{
_logger = logger;
//cluster client oluşturuyoruz
_cluster = new ClusterClient(new Configuration
{ Seeds = "localhost:9092" }, new ConsoleLogger());
}
public Task StartAsync(CancellationToken cancellationToken)
{
//mesajları burada tüketiyoruz producer'a verdiğimiz topic isminin aynısını buraya da veriyoruz ve bu topic'den producer'ın yazdığı mesajları okuyoruz.
_cluster.ConsumeFromLatest(topic: "demo");
_cluster.MessageReceived += record =>
{
_logger.LogInformation($"Received: {Encoding.UTF8.GetString(record.Value as byte[])}");
string value = Encoding.UTF8.GetString(record.Value as byte[]);
JavaScriptSerializer ser = new JavaScriptSerializer();
var r = ser.Deserialize<List<EmailMessage>>(value);
var b = ser.Deserialize<EmailMessage>(value);
Mail.SendMail(b);
};
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cluster?.Dispose();
return Task.CompletedTask;
}
}
Consumer'ı da oluşturduğumuza göre artık servislerimizi çalıştırabiliriz.
class Program
{
static void Main(string[] args)
{
//Ana makinemizi çalıştırıyoruz
CreateHostBuilder(args).Build().Run();
}
//Ana bilgisayar oluşturucumuz olacak
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
{
collection.AddHostedService<KafkaConsumerHostedService>();
collection.AddHostedService<KafkaProducerHostedService>();
});
}
docker image:
zookeeper: https://hub.docker.com/r/confluentinc/cp-zookeeper
kafka: https://hub.docker.com/r/confluentinc/cp-kafka/
Top comments (0)