DEV Community

Gülsen Keskin
Gülsen Keskin

Posted on

.NET Core ve Apache Kafka Kullanarak Mail Göndermek 💫 🌌 ✨

Bu yazıda Apache Kafka ve .Net Core kullanarak nasıl mail gönderebileceğimize bakacağız. İyi okumalar. 💫 🌌 ✨

Öncelikle docker kullanarak kafka ve zookeeper kurulumunu yapacağız.

Ardından bir producer ve consumer oluşturmak için kafka nuget paketini kullanarak basit bir konsol uygulaması oluşturacağız.

Kafka için öncelikle zookeeper'ı ayarlamamız gerekiyor çünkü zookeeper producer ve consumer'ları kontrol edecek.

Docker kullanarak zookeeper'ı ayarlamak için aşağıdaki komutu terminalinizde çalıştırın:
docker pull confluentinc/cp-zookeeper

Zookeeper'ı ayarladıktan sonra terminalinizde aşağıdaki komutu çalıştırarak kafka image'ını indirin.

docker pull confluentinc/cp-kafka

Şimdi ise hem zookeeper'ı hemde kafka konteynırları için kullanacağımız bir ağ oluşturmamız gerekiyor. Bunun için aşağıdaki komutu terminalinizde çalıştırın.

docker network create kafka

Artık bir zookeeper konteynır'ı oluşturabiliriz. Bunun için aşağıdaki komutu terminalinizde çalıştırın.

docker run -d --network=kafka --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper

(network alanına yeni oluşturduğumuz ağ ismini veriyoruz yani kafka, port olarak standart bağlantı noktası olan 2181 veriyoruz)

Aynı ağı tekrar kullanacak olan Kafka konteynırını oluşturmak için aşağıdaki komutu çalıştırın.

docker run -d --network=kafka --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 confluentinc/cp-kafka

iki image'ı da indirdikten sonra artık producer ve consumer'ın birbiriyle iletişim kurabilmesi için gerekli olan topic'i üretebiliriz.

Aşağıdaki komutu çalıştırarak "demo" adında bir topic oluşturun.
docker exec kafka kafka-topics --create --bootstrap-server localhost:9092 --topic demo

Zookeeper ve Kafkayı çalıştırdığımıza göre artık yeni bir konsol uygulaması oluşturarak Confluent Kafka nuget paketini yükleyebiliriz.

<PackageReference Include="Confluent.Kafka" Version="1.8.2" />

Bu proje için gerekli olan diğer paketleri de aşağıdaki gibi ekleyebilirsiniz.
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" />
<PackageReference Include="Nancy" Version="2.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="kafka-sharp" Version="1.4.3" />

Image description

Mail gönderiminde kullanmak için IMessageBase adında boş bir interface oluşturuyoruz.

public interface IMessageBase
{
}

Daha sonra EmailMessage adında bir class oluşturup bu interfaceden türetiyoruz.

  public class EmailMessage : IMessageBase
    {
        public string To { get; set; }
        public string Subject { get; set; }
        public string Content { get; set; }
    }
Enter fullscreen mode Exit fullscreen mode

Şimdi yapacağımız ilk şey mesajı yayınlacak olan yapımcıyı yani producer'ı oluşturmak

Producer

public class KafkaProducerHostedService : IHostedService
    {
        private readonly ILogger<KafkaProducerHostedService> _logger;
        private IProducer<Null, string> _producer;
        public KafkaProducerHostedService(ILogger<KafkaProducerHostedService> logger)
        {
            _logger = logger;
            var config = new ProducerConfig()
            {
                BootstrapServers = "localhost:9092"
            };

             //Producer iki tür paremetre alır bunlardan birincisi kendi türünden Null olur, ikincisi bir dize olur ve buradaki config'i alır
            _producer = new ProducerBuilder<Null, string>(config).Build();
        }
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            //burada messageList adında bir liste oluşturuyoruz ve içerisine göndermek istediğimiz mailin değerlerini veriyoruz.
            List<EmailMessage> messageList = new List<EmailMessage>();
            for (int i = 1; i < 11; i++)
            {
                var m = new EmailMessage
                {
                    Content = $"content {i} message",
                    Subject = $"subject {i} message",
                    To = "alicimail@gmail.com"
                };
                messageList.Add(m);
            }
            //daha sonra oluşturduğumuz bu listeyi gezerek producer ile "demo" adındaki topic'imize mesajları yazıyoruz.
            foreach (var message in messageList)
            {
                var value = JsonConvert.SerializeObject(message);
                _logger.LogInformation(value);
                await _producer.ProduceAsync(topic: "demo", new Message<Null, string>()
                {
                    Value = value
                }, cancellationToken);

                _producer.Flush(timeout: TimeSpan.FromSeconds(10));
                _producer.Poll(TimeSpan.FromMinutes(1));

            }
        }
        public Task StopAsync(CancellationToken cancellationToken)
        {
             //zaman uyumsuzluğu durumunda producer'ı elden çıkarır :)
            _producer?.Dispose();
            return Task.CompletedTask;
        }
    }
Enter fullscreen mode Exit fullscreen mode

Mail

Mail göndermek için SMTP kullanacağız bunun için Mail adında bir class oluşturuyoruz ve SendMail adında bir method ekliyoruz.

  public static class Mail
    {
        public static void SendMail(EmailMessage message)
        {
            MailMessage m = new MailMessage("göndericimail@gmail.com", message.To);
            m.Subject = message.Subject;
            m.Body = message.Content;

            SmtpClient client = new SmtpClient("smtp.gmail.com", 587);
            client.EnableSsl = true;
            client.Credentials = new System.Net.NetworkCredential("göndericimail@gmail.com", "sifre");
            try
            {
                client.Send(m);
                Console.WriteLine("Mail gönderildi");
            }
            catch (SmtpException ex)
            {
                Console.WriteLine("Exception caught in SendErrorLog: {0}",
                    ex.ToString());
            }
        }
    }
Enter fullscreen mode Exit fullscreen mode

Consumer

Consumer için kafka-sharp nuget paketini kullanıyoruz.

 public class KafkaConsumerHostedService : IHostedService
    {
        private readonly ILogger<KafkaConsumerHostedService> _logger;
        private ClusterClient _cluster;
        public KafkaConsumerHostedService(ILogger<KafkaConsumerHostedService> logger)
        {
            _logger = logger;
            //cluster client oluşturuyoruz
            _cluster = new ClusterClient(new Configuration
            { Seeds = "localhost:9092" }, new ConsoleLogger());

        }
        public Task StartAsync(CancellationToken cancellationToken)
        {
 //mesajları burada tüketiyoruz producer'a verdiğimiz topic isminin aynısını buraya da veriyoruz ve bu topic'den producer'ın yazdığı mesajları okuyoruz.

_cluster.ConsumeFromLatest(topic: "demo");
            _cluster.MessageReceived += record =>
            {
                _logger.LogInformation($"Received: {Encoding.UTF8.GetString(record.Value as byte[])}");

                string value = Encoding.UTF8.GetString(record.Value as byte[]);

                JavaScriptSerializer ser = new JavaScriptSerializer();
                var r = ser.Deserialize<List<EmailMessage>>(value);
                var b = ser.Deserialize<EmailMessage>(value);

                Mail.SendMail(b);
            };
            return Task.CompletedTask;
        }
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _cluster?.Dispose();
            return Task.CompletedTask;
        }
    }
Enter fullscreen mode Exit fullscreen mode

Consumer'ı da oluşturduğumuza göre artık servislerimizi çalıştırabiliriz.

class Program
{
    static void Main(string[] args)
    {
        //Ana makinemizi çalıştırıyoruz
        CreateHostBuilder(args).Build().Run();
    }

    //Ana bilgisayar oluşturucumuz olacak
    private static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureServices((context, collection) =>
            {
                collection.AddHostedService<KafkaConsumerHostedService>();
                collection.AddHostedService<KafkaProducerHostedService>();
            });
}
Enter fullscreen mode Exit fullscreen mode

docker image:
zookeeper: https://hub.docker.com/r/confluentinc/cp-zookeeper
kafka: https://hub.docker.com/r/confluentinc/cp-kafka/

reference

Top comments (0)