DEV Community

G├╝lsen Keskin
G├╝lsen Keskin

Posted on

.NET Core ve Apache Kafka Kullanarak Mail G├Ândermek ­čĺź ­čîî ÔťĘ

Bu yaz─▒da Apache Kafka ve .Net Core kullanarak nas─▒l mail g├Ânderebilece─čimize bakaca─č─▒z. ─░yi okumalar. ­čĺź ­čîî ÔťĘ

├ľncelikle docker kullanarak kafka ve zookeeper kurulumunu yapaca─č─▒z.

Ard─▒ndan bir producer ve consumer olu┼čturmak i├žin kafka nuget paketini kullanarak basit bir konsol uygulamas─▒ olu┼čturaca─č─▒z.

Kafka i├žin ├Âncelikle zookeeper'─▒ ayarlamam─▒z gerekiyor ├ž├╝nk├╝ zookeeper producer ve consumer'lar─▒ kontrol edecek.

Docker kullanarak zookeeper'─▒ ayarlamak i├žin a┼ča─č─▒daki komutu terminalinizde ├žal─▒┼čt─▒r─▒n:
docker pull confluentinc/cp-zookeeper

Zookeeper'─▒ ayarlad─▒ktan sonra terminalinizde a┼ča─č─▒daki komutu ├žal─▒┼čt─▒rarak kafka image'─▒n─▒ indirin.

docker pull confluentinc/cp-kafka

┼×imdi ise hem zookeeper'─▒ hemde kafka konteyn─▒rlar─▒ i├žin kullanaca─č─▒m─▒z bir a─č olu┼čturmam─▒z gerekiyor. Bunun i├žin a┼ča─č─▒daki komutu terminalinizde ├žal─▒┼čt─▒r─▒n.

docker network create kafka

Art─▒k bir zookeeper konteyn─▒r'─▒ olu┼čturabiliriz. Bunun i├žin a┼ča─č─▒daki komutu terminalinizde ├žal─▒┼čt─▒r─▒n.

docker run -d --network=kafka --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper

(network alan─▒na yeni olu┼čturdu─čumuz a─č ismini veriyoruz yani kafka, port olarak standart ba─člant─▒ noktas─▒ olan 2181 veriyoruz)

Ayn─▒ a─č─▒ tekrar kullanacak olan Kafka konteyn─▒r─▒n─▒ olu┼čturmak i├žin a┼ča─č─▒daki komutu ├žal─▒┼čt─▒r─▒n.

docker run -d --network=kafka --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 confluentinc/cp-kafka

iki image'─▒ da indirdikten sonra art─▒k producer ve consumer'─▒n birbiriyle ileti┼čim kurabilmesi i├žin gerekli olan topic'i ├╝retebiliriz.

A┼ča─č─▒daki komutu ├žal─▒┼čt─▒rarak "demo" ad─▒nda bir topic olu┼čturun.
docker exec kafka kafka-topics --create --bootstrap-server localhost:9092 --topic demo

Zookeeper ve Kafkay─▒ ├žal─▒┼čt─▒rd─▒─č─▒m─▒za g├Âre art─▒k yeni bir konsol uygulamas─▒ olu┼čturarak Confluent Kafka nuget paketini y├╝kleyebiliriz.

<PackageReference Include="Confluent.Kafka" Version="1.8.2" />

Bu proje i├žin gerekli olan di─čer paketleri de a┼ča─č─▒daki gibi ekleyebilirsiniz.
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" />
<PackageReference Include="Nancy" Version="2.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="kafka-sharp" Version="1.4.3" />

Image description

Mail g├Ânderiminde kullanmak i├žin IMessageBase ad─▒nda bo┼č bir interface olu┼čturuyoruz.

public interface IMessageBase
{
}

Daha sonra EmailMessage ad─▒nda bir class olu┼čturup bu interfaceden t├╝retiyoruz.

  public class EmailMessage : IMessageBase
    {
        public string To { get; set; }
        public string Subject { get; set; }
        public string Content { get; set; }
    }
Enter fullscreen mode Exit fullscreen mode

┼×imdi yapaca─č─▒m─▒z ilk ┼čey mesaj─▒ yay─▒nlacak olan yap─▒mc─▒y─▒ yani producer'─▒ olu┼čturmak

Producer

public class KafkaProducerHostedService : IHostedService
    {
        private readonly ILogger<KafkaProducerHostedService> _logger;
        private IProducer<Null, string> _producer;
        public KafkaProducerHostedService(ILogger<KafkaProducerHostedService> logger)
        {
            _logger = logger;
            var config = new ProducerConfig()
            {
                BootstrapServers = "localhost:9092"
            };

             //Producer iki t├╝r paremetre al─▒r bunlardan birincisi kendi t├╝r├╝nden Null olur, ikincisi bir dize olur ve buradaki config'i al─▒r
            _producer = new ProducerBuilder<Null, string>(config).Build();
        }
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            //burada messageList ad─▒nda bir liste olu┼čturuyoruz ve i├žerisine g├Ândermek istedi─čimiz mailin de─čerlerini veriyoruz.
            List<EmailMessage> messageList = new List<EmailMessage>();
            for (int i = 1; i < 11; i++)
            {
                var m = new EmailMessage
                {
                    Content = $"content {i} message",
                    Subject = $"subject {i} message",
                    To = "alicimail@gmail.com"
                };
                messageList.Add(m);
            }
            //daha sonra olu┼čturdu─čumuz bu listeyi gezerek producer ile "demo" ad─▒ndaki topic'imize mesajlar─▒ yaz─▒yoruz.
            foreach (var message in messageList)
            {
                var value = JsonConvert.SerializeObject(message);
                _logger.LogInformation(value);
                await _producer.ProduceAsync(topic: "demo", new Message<Null, string>()
                {
                    Value = value
                }, cancellationToken);

                _producer.Flush(timeout: TimeSpan.FromSeconds(10));
                _producer.Poll(TimeSpan.FromMinutes(1));

            }
        }
        public Task StopAsync(CancellationToken cancellationToken)
        {
             //zaman uyumsuzlu─ču durumunda producer'─▒ elden ├ž─▒kar─▒r :)
            _producer?.Dispose();
            return Task.CompletedTask;
        }
    }
Enter fullscreen mode Exit fullscreen mode

Mail

Mail g├Ândermek i├žin SMTP kullanaca─č─▒z bunun i├žin Mail ad─▒nda bir class olu┼čturuyoruz ve SendMail ad─▒nda bir method ekliyoruz.

  public static class Mail
    {
        public static void SendMail(EmailMessage message)
        {
            MailMessage m = new MailMessage("g├Ândericimail@gmail.com", message.To);
            m.Subject = message.Subject;
            m.Body = message.Content;

            SmtpClient client = new SmtpClient("smtp.gmail.com", 587);
            client.EnableSsl = true;
            client.Credentials = new System.Net.NetworkCredential("g├Ândericimail@gmail.com", "sifre");
            try
            {
                client.Send(m);
                Console.WriteLine("Mail g├Ânderildi");
            }
            catch (SmtpException ex)
            {
                Console.WriteLine("Exception caught in SendErrorLog: {0}",
                    ex.ToString());
            }
        }
    }
Enter fullscreen mode Exit fullscreen mode

Consumer

Consumer i├žin kafka-sharp nuget paketini kullan─▒yoruz.

 public class KafkaConsumerHostedService : IHostedService
    {
        private readonly ILogger<KafkaConsumerHostedService> _logger;
        private ClusterClient _cluster;
        public KafkaConsumerHostedService(ILogger<KafkaConsumerHostedService> logger)
        {
            _logger = logger;
            //cluster client olu┼čturuyoruz
            _cluster = new ClusterClient(new Configuration
            { Seeds = "localhost:9092" }, new ConsoleLogger());

        }
        public Task StartAsync(CancellationToken cancellationToken)
        {
 //mesajlar─▒ burada t├╝ketiyoruz producer'a verdi─čimiz topic isminin ayn─▒s─▒n─▒ buraya da veriyoruz ve bu topic'den producer'─▒n yazd─▒─č─▒ mesajlar─▒ okuyoruz.

_cluster.ConsumeFromLatest(topic: "demo");
            _cluster.MessageReceived += record =>
            {
                _logger.LogInformation($"Received: {Encoding.UTF8.GetString(record.Value as byte[])}");

                string value = Encoding.UTF8.GetString(record.Value as byte[]);

                JavaScriptSerializer ser = new JavaScriptSerializer();
                var r = ser.Deserialize<List<EmailMessage>>(value);
                var b = ser.Deserialize<EmailMessage>(value);

                Mail.SendMail(b);
            };
            return Task.CompletedTask;
        }
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _cluster?.Dispose();
            return Task.CompletedTask;
        }
    }
Enter fullscreen mode Exit fullscreen mode

Consumer'─▒ da olu┼čturdu─čumuza g├Âre art─▒k servislerimizi ├žal─▒┼čt─▒rabiliriz.

class Program
{
    static void Main(string[] args)
    {
        //Ana makinemizi ├žal─▒┼čt─▒r─▒yoruz
        CreateHostBuilder(args).Build().Run();
    }

    //Ana bilgisayar olu┼čturucumuz olacak
    private static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureServices((context, collection) =>
            {
                collection.AddHostedService<KafkaConsumerHostedService>();
                collection.AddHostedService<KafkaProducerHostedService>();
            });
}
Enter fullscreen mode Exit fullscreen mode

docker image:
zookeeper: https://hub.docker.com/r/confluentinc/cp-zookeeper
kafka: https://hub.docker.com/r/confluentinc/cp-kafka/

reference

Top comments (0)