После нескольких часов чтения документации AMQ я мог сделать это сам.
Ниже приведен полный код, который в настоящее время работает, но, очевидно, требует немного TLC, все предложения приветствуются.
using System;
using System.Text;
using System.Data.SqlClient;
using Apache.NMS;
using Serilog;
namespace AMQ_ConsoleApp
{
class Program
{
private const string AMQ_URI = "activemq:ssl://abc.net";
private const string AMQ_Queue = "test";
private const string AMQ_User = "userId";
private const string AMQ_Pwd = "password";
static void Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.WriteTo.Console(
outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level:u3}] {Message:lj}{NewLine}{Exception}"
)
.WriteTo.File("AMQ_SSIS_Connector.log"
, rollingInterval: RollingInterval.Day
, outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level:u3}] {Message:lj}{NewLine}{Exception}"
, retainedFileCountLimit:30
)
.CreateLogger();
try
{ Log.Information("#### BEGIN #####");
IConnectionFactory factory = new NMSConnectionFactory(new Uri(AMQ_URI));
IConnection Q_connection = factory.CreateConnection(AMQ_User, AMQ_Pwd);
ISession Q_session = Q_connection.CreateSession();
Log.Debug("Attempting to connect to Queue {Queue}", AMQ_Queue);
Q_connection.Start();
IDestination destination = Q_session.GetQueue(AMQ_Queue);
IMessageConsumer consumer = Q_session.CreateConsumer(destination);
IMessage message;
while (true)
{
Log.Information("______________________________");
Log.Information("Awaiting new message...");
message = consumer.Receive();
if (message != null)
{
ITextMessage textMessage = message as ITextMessage;
if (!string.IsNullOrEmpty(textMessage.Text))
{
Log.Information("Reading message with ID : " + textMessage.NMSMessageId);
WriteToDB(textMessage);
}
}
}
}
catch (Exception ex)
{
Log.Error(ex.ToString());
throw;
}
//ShutDown(Q_session, Q_connection);
}
public static void WriteToDB(ITextMessage msg)
{
try
{
const string SQL_Datasource = "(localdb)\\LocalDBv14";
const string SQL_InitialCatalog = "Customer";
const string SQL_User = "";
const string SQL_User_Pwd = "";
const string SQL_TargetTable = "TableA";
const string SQL_Schema = "dbo";
SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder();
builder.DataSource = SQL_Datasource;
builder.InitialCatalog = SQL_InitialCatalog;
builder.IntegratedSecurity = true; // Disable for Named SQLServer A/c
StringBuilder sb_SQL = new StringBuilder();
sb_SQL.Append("INSERT INTO ");
sb_SQL.Append("[");
sb_SQL.Append(SQL_Schema);//
sb_SQL.Append("].[");
sb_SQL.Append(SQL_TargetTable);//
sb_SQL.Append("] (");
sb_SQL.Append("[" + "MessageID" + "]");//Fields
sb_SQL.Append(",[" + "Message" + "]");//Fields
sb_SQL.Append(",[" + "AMQ_URI" + "]");//Fields
sb_SQL.Append(",[" + "AMQ_Queue" + "]");//Fields
sb_SQL.Append(",[" + "AMQ_Type" + "]");//Fields
sb_SQL.Append(",[" + "AMQ_Timestamp" + "]");//Fields
sb_SQL.Append(",[" + "RECORD_INSERTED_AT" + "]");//Fields
sb_SQL.Append(",[" + "RECORD_INSERTED_BY" + "]");//Fields
sb_SQL.Append(") VALUES (");
sb_SQL.Append("'" + msg.NMSMessageId + "'");//Data
sb_SQL.Append(",'" + msg.Text + "'");//Data
sb_SQL.Append(",'" + AMQ_URI + "'");//Data
sb_SQL.Append(",'" + msg.NMSDestination.ToString() + "'");//Data
sb_SQL.Append(",'" + msg.NMSType + "'");//Data
sb_SQL.Append("," + msg.NMSTimestamp );//Data
sb_SQL.Append("," + "getdate()" + "");//Data
sb_SQL.Append(",'" + "SSIS_User" + "'");//Data
sb_SQL.Append(")");
// Connect to SQL
Log.Information("Connecting to SQL Server...{Server}", SQL_Datasource);
using (SqlConnection connection = new SqlConnection(builder.ConnectionString))
{
int ret_rows = 0;
Log.Information("Opening DB Connection...{Connection}", SQL_Datasource+"."+SQL_InitialCatalog+"."+SQL_Schema);
connection.Open();
Log.Information("Inserting data into...{Table}", SQL_TargetTable);
using (SqlCommand command = new SqlCommand(sb_SQL.ToString(), connection))
ret_rows = command.ExecuteNonQuery();
if (ret_rows > 0)
{
Log.Information("Data committed to DB successfully, Inserted {records} records." , ret_rows);
//DequeueMessage(msg);
}
else {
Log.Fatal("Data commit to DB Failed.");
}
Log.Information("Closing DB connection...");
connection.Close();
Log.Information("DB Connection closed.");
}
}
catch (SqlException e)
{
Log.Error(e.ToString());
}
}
private static void DequeueMessage(IMessage message)
{
try
{
message.Acknowledge();
Log.Information("Message with ID {msg} de-queued from AMQ.", message.NMSMessageId);
}
catch (Exception ex)
{
Log.Error(ex.ToString());
}
}
private static void ShutDown(ISession session, IConnection connection)
{
Log.Information("Ending AMQ Session");
session.Close();
Log.Information("Closing AMQ Connection");
connection.Close();
}
}
}