Confluent Kafka - Schema Registry registrieren bzw. HttpRequestException

  • C#
  • .NET (FX) 4.5–4.8

    Confluent Kafka - Schema Registry registrieren bzw. HttpRequestException

    Hallo,

    ich arbeite an einer Verbindung zweier Programme via Kafka. Ich habe ein Programm (Consumer) in Python, dass mir Integer Werte anzeigen kann, die von einem Producer gesendet werden. Zunächst habe ich einen einfachen C# Producer gebaut, ohne ein Schema zu verwenden. Das schöne ist, dass mir mein Python Consumer dennoch mitteilt, dass irgendwas gesendet wird. Allerding kann ich den gesendeten Integer Wert nicht auslesen lassen, da ich das Schema in dieser Implementierung nicht angegeben habe. Zusätzlich habe ich auch einen Consumer in C# geschrieben, der ohne Schema wunderbar mit dem Producer funktioniert. (Ohne Schema funktionieren C# Producer/Consumer perfekt!)

    Jetzt ist das Ziel einen Producer zu bauen, der mit einem definierten Schema umgehen kann. Das Problem an dieser Stelle ich jedoch, dass ich eine Fehlermeldung bekommen. Die genaue Fehlermeldung habe ich einmal unten angegeben. Meine Vermutung ist, dass die Fehlermeldung resultiert, da die schemaRegistryUrl nicht von mir weiter definiert wird. Diese muss vermutlich vorher irgendwie registriert werden, hier ist dann die Frage womit das bewerkstelligt werden kann (eine ideale Lösung wäre über den Code). Und genau das ist der Punkt, wo ich eine Frage zu habe, wie kann denn eine Schema Registry URL registriert werden? Die Dokumentation gibt hier leider keine guten Hinweise. Ich habe hier schon etwas gefunden, allerdings gilt das nur für Python es geht aber in die Richtung, mit der die Fehlermeldung dann vermutlich behoben werden kann (stackoverflow.com/questions/61…-in-kafka-schema-registry).

    Ich würde mich sehr über Antworten freuen! :)

    Fehlermeldung:
    Spoiler anzeigen

    Quellcode

    1. Exception thrown: 'Confluent.Kafka.ProduceException`2' in mscorlib.dll
    2. Confluent.Kafka.ProduceException`2[Confluent.Kafka.Null,Avro.Generic.GenericRecord]: Local: Value serialization error ---> System.Net.Http.HttpRequestException: [http://localhost:8081/] HttpRequestException: Fehler beim Senden der Anforderung.
    3. bei Confluent.SchemaRegistry.RestService.<ExecuteOnOneInstanceAsync>d__6.MoveNext()
    4. --- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde ---
    5. bei System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
    6. bei System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
    7. bei Confluent.SchemaRegistry.RestService.<RequestAsync>d__7`1.MoveNext()
    8. --- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde ---
    9. bei System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
    10. bei System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
    11. bei Confluent.SchemaRegistry.RestService.<RegisterSchemaAsync>d__15.MoveNext()
    12. --- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde ---
    13. bei System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
    14. bei System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
    15. bei Confluent.SchemaRegistry.CachedSchemaRegistryClient.<RegisterSchemaAsync>d__19.MoveNext()
    16. --- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde ---
    17. bei System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
    18. bei System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
    19. bei Confluent.SchemaRegistry.Serdes.GenericSerializerImpl.<Serialize>d__8.MoveNext()
    20. --- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde ---
    21. bei System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
    22. bei System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
    23. bei Confluent.SchemaRegistry.Serdes.AvroSerializer`1.<SerializeAsync>d__6.MoveNext()
    24. --- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde ---
    25. bei System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
    26. bei System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
    27. bei System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
    28. bei Confluent.Kafka.Producer`2.<ProduceAsync>d__51.MoveNext()
    29. --- Ende der internen Ausnahmestapelüberwachung ---
    30. bei Confluent.Kafka.Producer`2.<ProduceAsync>d__51.MoveNext()
    31. --- Ende der Stapelüberwachung vom vorhergehenden Ort, an dem die Ausnahme ausgelöst wurde ---
    32. bei System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
    33. bei System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
    34. bei System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
    35. bei ProducerV1.ProducerV1Prog.<Main>d__0.MoveNext() in C:\...\Producer.cs:Zeile 53.



    Lösung: Tatsächlich lag das Problem in der nicht korrekt definierten Schema Registry, hier gibt es zwei verschiedene URLs die inter/extern genutzt werden können.

    Dieser Beitrag wurde bereits 3 mal editiert, zuletzt von „VB.neter0101“ ()